=== .env.example === IRC_NICK=ircstore IRC_SERVER=irc.rotko.net IRC_CHANNELS=#support,#dev IRC_PORT=6697 IRC_USE_TLS=true S2_AUTH_TOKEN=GMsAAAAAAABos/7Qhash+qBnxxxxxxxxxx+rZr+ehcXoSPH BASE_PATH=./ircstore-data === Cargo.toml === [package] name = "ircstore" version = "0.1.2" edition = "2021" [lib] name = "ircstore" path = "src/lib.rs" [[bin]] name = "ircstore" path = "src/bin/indexer.rs" [[bin]] name = "ircstore-web" path = "src/bin/web.rs" [dependencies] clap = { version = "4", features = ["derive", "env"] } streamstore = "0.19" tokio = { version = "1", features = ["full"] } irc = { version = "0.15", features = ["tls-native"] } futures = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" chrono = "0.4" anyhow = "1" notify = "6" base64 = "0.21" async-stream = "0.3" dotenv = "0.15" axum = { version = "0.7", features = ["ws"] } tower = "0.4" tower-http = { version = "0.5", features = ["compression-gzip", "cors"] } http = "1" === Dockerfile === FROM rust:latest AS builder WORKDIR /app COPY Cargo.toml Cargo.lock ./ COPY src ./src COPY static ./static/ RUN cargo build --release FROM debian:trixie-slim RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/ircstore /usr/local/bin/ircstore COPY --from=builder /app/target/release/ircstore-web /usr/local/bin/ircstore-web COPY static /app/static WORKDIR /app === src/bin/indexer.rs === use ircstore::{IrcEvent, S2Store}; use irc::client::prelude::*; use futures::StreamExt; use chrono::Utc; use anyhow::Result; use clap::Parser; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { #[arg(short, long, env = "IRC_NICK", default_value = "ircstore")] nick: String, #[arg(short, long, env = "IRC_SERVER", default_value = "irc.rotko.net")] server: String, #[arg(short, long, env = "IRC_PORT", default_value = "6697")] port: u16, #[arg(short = 't', long, env = "IRC_USE_TLS", default_value = "true")] use_tls: bool, #[arg(short, long, env = "IRC_CHANNELS", value_delimiter = ',')] channels: Option>, #[arg(short = 'a', long, env = "S2_AUTH_TOKEN")] s2_auth_token: Option, } struct IrcIndexer { store: S2Store, irc_client: irc::client::Client, server: String, } impl IrcIndexer { async fn new(args: Args) -> Result { if let Some(token) = args.s2_auth_token { std::env::set_var("S2_AUTH_TOKEN", token); } let store = S2Store::new().await?; let channels = args.channels.unwrap_or_else(|| vec!["#support".to_string(), "#dev".to_string()]); for channel in &channels { store.ensure_stream(channel).await?; println!("Created stream for {}", channel); } store.ensure_stream("_global").await?; let irc_config = Config { server: Some(args.server.clone()), port: Some(args.port), use_tls: Some(args.use_tls), channels, nickname: Some(args.nick), ..Default::default() }; println!("Connecting to {}:{} (TLS: {})", args.server, args.port, args.use_tls); let irc_client = irc::client::Client::from_config(irc_config).await?; irc_client.identify()?; Ok(Self { store, irc_client, server: args.server, }) } async fn run(&mut self) -> Result<()> { let mut stream = self.irc_client.stream()?; println!("Connected! Waiting for messages..."); while let Some(msg) = stream.next().await.transpose()? { let event = self.process_message(&msg)?; let stream_name = event.channel.clone().unwrap_or_else(|| "_global".to_string()); if let Err(e) = self.store.append(&stream_name, &event).await { eprintln!("Failed to append: {}", e); } if let Command::PRIVMSG(target, text) = &msg.command { println!("[{}] {}: {}", target, event.nick.as_ref().unwrap_or(&"*".to_string()), text); } } Ok(()) } fn process_message(&self, msg: &Message) -> Result { let (channel, nick, message) = match &msg.command { Command::PRIVMSG(target, text) => { (Some(target.clone()), msg.source_nickname().map(|s| s.to_string()), text.clone()) } Command::JOIN(chan, _, _) => { (Some(chan.clone()), msg.source_nickname().map(|s| s.to_string()), format!("*** {} joined", msg.source_nickname().unwrap_or("*"))) } Command::PART(chan, reason) => { let part_msg = if let Some(r) = reason { format!("*** {} left ({})", msg.source_nickname().unwrap_or("*"), r) } else { format!("*** {} left", msg.source_nickname().unwrap_or("*")) }; (Some(chan.clone()), msg.source_nickname().map(|s| s.to_string()), part_msg) } Command::QUIT(reason) => { let quit_msg = if let Some(r) = reason { format!("*** {} quit ({})", msg.source_nickname().unwrap_or("*"), r) } else { format!("*** {} quit", msg.source_nickname().unwrap_or("*")) }; (None, msg.source_nickname().map(|s| s.to_string()), quit_msg) } Command::TOPIC(chan, topic) => { let topic_msg = if let Some(t) = topic { format!("*** Topic set to: {}", t) } else { "*** Topic cleared".to_string() }; (Some(chan.clone()), msg.source_nickname().map(|s| s.to_string()), topic_msg) } } _ => (None, None, format!("{:?}", msg.command)) }; Ok(IrcEvent { timestamp: Utc::now().timestamp(), server: self.server.clone(), channel, nick, message, raw: format!("{}", msg), }) } } #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().ok(); let args = Args::parse(); let default_channels = vec!["#support".to_string(), "#dev".to_string()]; let channels = args.channels.as_ref().unwrap_or(&default_channels); println!("Starting IRC indexer for channels: {:?}", channels); let mut indexer = IrcIndexer::new(args).await?; indexer.run().await?; Ok(()) } === src/bin/web.rs === use ircstore::{IrcEvent, S2Store}; use axum::{ extract::{Path, Query, State, WebSocketUpgrade, ws::Message}, response::{Html, IntoResponse}, routing::get, Router, Json, }; use serde::Deserialize; use std::sync::Arc; use tower_http::compression::CompressionLayer; use tower_http::cors::CorsLayer; use anyhow::Result; use clap::Parser; #[derive(Parser)] struct Args { #[arg(short, long, env = "WEB_PORT", default_value = "8080")] port: u16, #[arg(short, long, env = "WEB_BIND", default_value = "127.0.0.1")] bind: String, #[arg(long, env = "WEB_ALLOW_CHANNELS", value_delimiter = ',')] allow: Option>, #[arg(long, env = "WEB_DENY_CHANNELS", value_delimiter = ',')] deny: Option>, } #[derive(Deserialize)] struct LogQuery { limit: Option, offset: Option, } #[derive(Clone)] struct AppState { store: Arc, allowed: Arc bool + Send + Sync>, } async fn index() -> Html<&'static str> { Html(include_str!("../../static/index.html")) } async fn help() -> Html<&'static str> { Html(include_str!("../../static/help.html")) } async fn channel_logs( Path(channel): Path, Query(params): Query, State(state): State, ) -> Result>, String> { if !(state.allowed)(&channel) { return Err("Forbidden".to_string()); } let limit = params.limit.unwrap_or(100).min(1000); let offset = params.offset.unwrap_or(0); state.store.read(&channel, offset, limit).await .map(Json) .map_err(|e| e.to_string()) } async fn list_channels(State(state): State) -> Result>, String> { let channels = state.store.list_streams().await.map_err(|e| e.to_string())?; Ok(Json(channels.into_iter() .filter(|c| c != "_global" && (state.allowed)(c)) .collect())) } async fn websocket_handler(ws: WebSocketUpgrade, State(state): State) -> impl IntoResponse { ws.on_upgrade(|socket| handle_socket(socket, state)) } async fn handle_socket(mut socket: axum::extract::ws::WebSocket, state: AppState) { let mut rx = state.store.subscribe(); loop { tokio::select! { msg = rx.recv() => { match msg { Ok(event) => { if let Some(ch) = &event.channel { if !(state.allowed)(ch) { continue; } } if let Ok(json) = serde_json::to_string(&event) { if socket.send(Message::Text(json)).await.is_err() { break; } } } Err(_) => break, } } Some(msg) = socket.recv() => { if msg.is_err() { break; } } } } } async fn tail_all_streams(store: Arc) { loop { if let Ok(streams) = store.list_streams().await { for stream in streams { let store = store.clone(); tokio::spawn(async move { let _ = store.tail(&stream).await; }); } } tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; } } #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().ok(); let args = Args::parse(); let filter: Arc bool + Send + Sync> = match (args.allow, args.deny) { (Some(allow), _) => Arc::new(move |c| allow.contains(&c.to_string())), (_, Some(deny)) => Arc::new(move |c| !deny.contains(&c.to_string())), _ => Arc::new(|_| true), }; let store = Arc::new(S2Store::new().await?); let store_clone = store.clone(); tokio::spawn(tail_all_streams(store_clone)); let state = AppState { store, allowed: filter, }; let app = Router::new() .route("/", get(index)) .route("/help", get(help)) .route("/api/channels", get(list_channels)) .route("/api/logs/:channel", get(channel_logs)) .route("/ws", get(websocket_handler)) .layer(CorsLayer::new() .allow_origin(tower_http::cors::Any) .allow_methods(vec![http::Method::GET])) .layer(CompressionLayer::new()) .with_state(state); let listener = tokio::net::TcpListener::bind(format!("{}:{}", args.bind, args.port)).await?; println!("Listening on http://{}:{}", args.bind, args.port); axum::serve(listener, app).await?; Ok(()) } === src/lib.rs === use serde::{Deserialize, Serialize}; use s2::{Client, ClientConfig}; use s2::types::*; use anyhow::Result; use futures::StreamExt; use tokio::sync::broadcast; use std::sync::Arc; use std::collections::HashMap; use tokio::sync::RwLock; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct IrcEvent { pub timestamp: i64, pub server: String, pub channel: Option, pub nick: Option, pub message: String, pub raw: String, } pub struct S2Store { client: Client, tx: Arc>, last_seq: Arc>>, } impl S2Store { pub async fn new() -> Result { let token = std::env::var("S2_AUTH_TOKEN")?; let config = ClientConfig::new(token); let client = Client::new(config); let (tx, _) = broadcast::channel(1000); Ok(Self { client, tx: Arc::new(tx), last_seq: Arc::new(RwLock::new(HashMap::new())), }) } pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } pub async fn ensure_stream(&self, name: &str) -> Result<()> { let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let _ = self.client.create_basin( CreateBasinRequest::new(basin.clone()) ).await; let basin_client = self.client.basin_client(basin); let _ = basin_client.create_stream( CreateStreamRequest::new(name) ).await; Ok(()) } pub async fn append(&self, stream: &str, event: &IrcEvent) -> Result<()> { let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let stream_client = self.client.basin_client(basin).stream_client(stream); let data = serde_json::to_vec(event)?; let record = AppendRecord::new(data)?; let batch = AppendRecordBatch::try_from_iter([record]) .map_err(|(_, _)| anyhow::anyhow!("Failed to create batch"))?; let input = AppendInput::new(batch); stream_client.append(input).await?; // Broadcast the event let _ = self.tx.send(event.clone()); Ok(()) } pub async fn read(&self, stream: &str, offset: u64, limit: usize) -> Result> { let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let stream_client = self.client.basin_client(basin).stream_client(stream); let req = ReadRequest::new( ReadStart::SeqNum(offset) ).with_limit( ReadLimit::new().with_count(limit as u64) ); let output = stream_client.read(req).await?; let mut events = Vec::new(); if let ReadOutput::Batch(batch) = output { for record in batch.records { if let Ok(event) = serde_json::from_slice::(&record.body) { events.push(event); } } } Ok(events) } pub async fn list_streams(&self) -> Result> { let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let basin_client = self.client.basin_client(basin); let resp = basin_client.list_streams( ListStreamsRequest::new() ).await?; Ok(resp.streams.into_iter().map(|s| s.name).collect()) } pub async fn get_stream_info(&self, stream: &str) -> Result { let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let stream_client = self.client.basin_client(basin).stream_client(stream); let info = stream_client.check_tail().await?; Ok(info.seq_num) } pub async fn tail(&self, stream: &str) -> Result<()> { let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let stream_client = self.client.basin_client(basin).stream_client(stream); // Get current position let start_seq = { let last_seqs = self.last_seq.read().await; last_seqs.get(stream).copied() }.unwrap_or_else(|| { // If no position stored, get current tail 0 // Could use get_stream_info here to start from end }); let req = ReadSessionRequest::new( ReadStart::SeqNum(start_seq) ); let mut stream_reader = stream_client.read_session(req).await?; while let Some(result) = stream_reader.next().await { if let Ok(output) = result { if let ReadOutput::Batch(batch) = output { let mut last_seq = 0u64; for record in batch.records { last_seq = record.seq_num; // Only broadcast if this is a new message let should_broadcast = { let last_seqs = self.last_seq.read().await; last_seqs.get(stream).map_or(true, |&s| record.seq_num > s) }; if should_broadcast { if let Ok(event) = serde_json::from_slice::(&record.body) { let _ = self.tx.send(event); } } } // Update last seen sequence if last_seq > 0 { let mut last_seqs = self.last_seq.write().await; last_seqs.insert(stream.to_string(), last_seq); } } } } Ok(()) } } === static/help.html === ircstore - API Documentation

ircstore

Persistent IRC log storage with S2 compression and real-time WebSocket streaming.

What is ircstore?

ircstore is a high-performance IRC logging system that:

  • Stores IRC messages using S2 compression for efficient disk usage
  • Provides REST API for querying historical logs
  • Streams real-time messages via WebSocket
  • Maintains separate streams for each IRC channel

REST API

GET /api/channels

List all available IRC channels

curl https://ircstore.rotko.net/api/channels

Response:

["#rust", "#linux", "#networking"]
GET /api/logs/:channel

Retrieve historical logs for a specific channel

Query parameters:

  • limit - Number of messages (default: 100, max: 1000)
  • offset - Starting offset for pagination (default: 0)
curl "https://ircstore.rotko.net/api/logs/%23rust?limit=50&offset=0"

Response:

[
  {
    "timestamp": 1698765432,
    "channel": "#rust",
    "nick": "alice",
    "message": "anyone familiar with async traits?"
  }
]

WebSocket API

WS /ws

Subscribe to real-time IRC messages across all channels

const ws = new WebSocket('wss://ircstore.rotko.net/ws');

ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
    console.log(`[${msg.channel}] ${msg.nick}: ${msg.message}`);
};

Message format:

{
  "timestamp": 1698765432,
  "channel": "#rust",
  "nick": "alice",
  "message": "anyone familiar with async traits?"
}

Example: Python Client

import requests
import websocket
import json

# Get available channels
channels = requests.get('https://ircstore.rotko.net/api/channels').json()
print(f"Available channels: {channels}")

# Get recent logs
logs = requests.get('https://ircstore.rotko.net/api/logs/%23rust?limit=10').json()
for msg in logs:
    print(f"[{msg['nick']}] {msg['message']}")

# Subscribe to real-time updates
def on_message(ws, message):
    msg = json.loads(message)
    print(f"[{msg['channel']}] {msg['nick']}: {msg['message']}")

ws = websocket.WebSocketApp('wss://ircstore.rotko.net/ws', on_message=on_message)
ws.run_forever()

Example: JavaScript Client

// Fetch historical logs
async function getLogs(channel, limit = 100) {
    const res = await fetch(`/api/logs/${encodeURIComponent(channel)}?limit=${limit}`);
    return await res.json();
}

// Real-time subscription
const ws = new WebSocket('wss://ircstore.rotko.net/ws');
const messageHandlers = new Map();

ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
    const handler = messageHandlers.get(msg.channel);
    if (handler) handler(msg);
};

// Subscribe to specific channel
function subscribeToChannel(channel, callback) {
    messageHandlers.set(channel, callback);
}

subscribeToChannel('#rust', (msg) => {
    console.log(`${msg.nick}: ${msg.message}`);
});

Data Format

IrcEvent Structure

interface IrcEvent {
    timestamp: number;  // Unix timestamp
    channel: string;    // IRC channel name
    nick: string;       // Sender nickname
    message: string;    // Message content
}

Implementation Details

  • Built with Rust using Axum web framework
  • S2 compression for efficient storage
  • Append-only log structure for each channel
  • Tokio async runtime for high concurrency
  • CORS enabled for browser-based clients

Source Code

The project is open source and available at github.com/rotkonetworks/ircstore

=== static/index.html === ircstore

Channels