# File Structure Total files: 3 src/ lib.rs src/bin/ indexer.rs web.rs === src/bin/indexer.rs === use ircstore::{IrcEvent, S2Store}; use irc::client::prelude::*; use futures::StreamExt; use chrono::Utc; use anyhow::Result; use clap::Parser; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { #[arg(short, long, env = "IRC_NICK", default_value = "ircstore")] nick: String, #[arg(short, long, env = "IRC_SERVER", default_value = "irc.rotko.net")] server: String, #[arg(short, long, env = "IRC_PORT", default_value = "6697")] port: u16, #[arg(short = 't', long, env = "IRC_USE_TLS", default_value = "true")] use_tls: bool, #[arg(short, long, env = "IRC_CHANNELS", value_delimiter = ',')] channels: Option>, #[arg(short = 'a', long, env = "S2_AUTH_TOKEN")] s2_auth_token: Option, } struct IrcIndexer { store: S2Store, irc_client: irc::client::Client, server: String, } impl IrcIndexer { async fn new(args: Args) -> Result { if let Some(token) = args.s2_auth_token { std::env::set_var("S2_AUTH_TOKEN", token); } let store = S2Store::new().await?; let channels = args.channels.unwrap_or_else(|| vec!["#support".to_string(), "#dev".to_string()]); for channel in &channels { store.ensure_stream(channel).await?; println!("Created stream for {}", channel); } store.ensure_stream("_global").await?; let irc_config = Config { server: Some(args.server.clone()), port: Some(args.port), use_tls: Some(args.use_tls), channels, nickname: Some(args.nick), ..Default::default() }; println!("Connecting to {}:{} (TLS: {})", args.server, args.port, args.use_tls); let irc_client = irc::client::Client::from_config(irc_config).await?; irc_client.identify()?; Ok(Self { store, irc_client, server: args.server, }) } async fn run(&mut self) -> Result<()> { let mut stream = self.irc_client.stream()?; println!("Connected! Waiting for messages..."); while let Some(msg) = stream.next().await.transpose()? { let event = self.process_message(&msg)?; let stream_name = event.channel.clone().unwrap_or_else(|| "_global".to_string()); if let Err(e) = self.store.append(&stream_name, &event).await { eprintln!("Failed to append: {}", e); } if let Command::PRIVMSG(target, text) = &msg.command { println!("[{}] {}: {}", target, event.nick.as_ref().unwrap_or(&"*".to_string()), text); } } Ok(()) } fn process_message(&self, msg: &Message) -> Result { let (channel, nick, message) = match &msg.command { Command::PRIVMSG(target, text) => { (Some(target.clone()), msg.source_nickname().map(|s| s.to_string()), text.clone()) } Command::JOIN(chan, _, _) => { (Some(chan.clone()), msg.source_nickname().map(|s| s.to_string()), format!("*** {} joined", msg.source_nickname().unwrap_or("*"))) } Command::PART(chan, reason) => { let part_msg = if let Some(r) = reason { format!("*** {} left ({})", msg.source_nickname().unwrap_or("*"), r) } else { format!("*** {} left", msg.source_nickname().unwrap_or("*")) }; (Some(chan.clone()), msg.source_nickname().map(|s| s.to_string()), part_msg) } Command::QUIT(reason) => { let quit_msg = if let Some(r) = reason { format!("*** {} quit ({})", msg.source_nickname().unwrap_or("*"), r) } else { format!("*** {} quit", msg.source_nickname().unwrap_or("*")) }; (None, msg.source_nickname().map(|s| s.to_string()), quit_msg) } Command::TOPIC(chan, topic) => { let topic_msg = if let Some(t) = topic { format!("*** Topic set to: {}", t) } else { "*** Topic cleared".to_string() }; (Some(chan.clone()), msg.source_nickname().map(|s| s.to_string()), topic_msg) } _ => (None, None, format!("{:?}", msg.command)) }; Ok(IrcEvent { timestamp: Utc::now().timestamp(), server: self.server.clone(), channel, nick, message, raw: format!("{}", msg), }) } } #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().ok(); let args = Args::parse(); let default_channels = vec!["#support".to_string(), "#dev".to_string()]; let channels = args.channels.as_ref().unwrap_or(&default_channels); println!("Starting IRC indexer for channels: {:?}", channels); let mut indexer = IrcIndexer::new(args).await?; indexer.run().await?; Ok(()) } === src/bin/web.rs === use ircstore::{IrcEvent, S2Store}; use axum::{ extract::{Path, Query, State, WebSocketUpgrade, ws::Message}, response::{Html, IntoResponse}, routing::get, Router, Json, }; use serde::Deserialize; use std::sync::Arc; use tower_http::compression::CompressionLayer; use tower_http::cors::CorsLayer; use anyhow::Result; use clap::Parser; #[derive(Parser)] struct Args { #[arg(short, long, env = "WEB_PORT", default_value = "8080")] port: u16, #[arg(short, long, env = "WEB_BIND", default_value = "127.0.0.1")] bind: String, #[arg(long, env = "WEB_ALLOW_CHANNELS", value_delimiter = ',')] allow: Option>, #[arg(long, env = "WEB_DENY_CHANNELS", value_delimiter = ',')] deny: Option>, } #[derive(Deserialize)] struct LogQuery { limit: Option, offset: Option, } #[derive(Clone)] struct AppState { store: Arc, allowed: Arc bool + Send + Sync>, } async fn index() -> Html<&'static str> { Html(include_str!("../../static/index.html")) } async fn help() -> Html<&'static str> { Html(include_str!("../../static/help.html")) } async fn channel_logs( Path(channel): Path, Query(params): Query, State(state): State, ) -> Result>, String> { if !(state.allowed)(&channel) { return Err("forbidden".to_string()); } let limit = params.limit.unwrap_or(100).min(1000); let offset = params.offset.unwrap_or(0); state.store.read(&channel, offset, limit).await .map(Json) .map_err(|e| e.to_string()) } async fn list_channels(State(state): State) -> Result>, String> { let channels = state.store.list_streams().await.map_err(|e| e.to_string())?; Ok(Json(channels.into_iter() .filter(|c| c != "_global" && (state.allowed)(c)) .collect())) } async fn websocket_handler(ws: WebSocketUpgrade, State(state): State) -> impl IntoResponse { ws.on_upgrade(|socket| handle_socket(socket, state)) } async fn handle_socket(mut socket: axum::extract::ws::WebSocket, state: AppState) { let Ok(streams) = state.store.list_streams().await else { return }; for stream in streams { state.store.ensure_tail(&stream).await; } let mut rx = state.store.subscribe(); loop { tokio::select! { Ok(event) = rx.recv() => { if let Some(ch) = &event.channel { if !(state.allowed)(ch) { continue; } } let Ok(json) = serde_json::to_string(&event) else { continue }; if socket.send(Message::Text(json)).await.is_err() { break; } } msg = socket.recv() => { match msg { Some(Ok(_)) => {} Some(Err(_)) | None => break, } } else => break, } } state.store.stop_tails_if_idle().await; } #[tokio::main] async fn main() -> Result<()> { dotenv::dotenv().ok(); let args = Args::parse(); let filter: Arc bool + Send + Sync> = match (args.allow, args.deny) { (Some(allow), _) => Arc::new(move |c| allow.contains(&c.to_string())), (_, Some(deny)) => Arc::new(move |c| !deny.contains(&c.to_string())), _ => Arc::new(|_| true), }; let store = Arc::new(S2Store::new().await?); let state = AppState { store, allowed: filter }; let app = Router::new() .route("/", get(index)) .route("/help", get(help)) .route("/api/channels", get(list_channels)) .route("/api/logs/:channel", get(channel_logs)) .route("/ws", get(websocket_handler)) .layer(CorsLayer::new() .allow_origin(tower_http::cors::Any) .allow_methods(vec![http::Method::GET])) .layer(CompressionLayer::new()) .with_state(state); let addr = format!("{}:{}", args.bind, args.port); let listener = tokio::net::TcpListener::bind(&addr).await?; println!("listening on {}", addr); axum::serve(listener, app).await?; Ok(()) } === src/lib.rs === use serde::{Deserialize, Serialize}; use s2::{Client, ClientConfig}; use s2::types::*; use anyhow::Result; use futures::StreamExt; use tokio::sync::broadcast; use std::sync::Arc; use std::collections::HashMap; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio::time::{timeout, Duration}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct IrcEvent { pub timestamp: i64, pub server: String, pub channel: Option, pub nick: Option, pub message: String, pub raw: String, } pub struct S2Store { client: Client, basin: BasinName, tx: Arc>, last_seq: Arc>>, tail_tasks: Arc>>>, } impl S2Store { pub async fn new() -> Result { let token = std::env::var("S2_AUTH_TOKEN")?; let server = std::env::var("IRC_SERVER").unwrap_or_else(|_| "irc-rotko-net".to_string()); let basin_name = server.replace('.', "-").replace(':', "-"); let basin: BasinName = basin_name.parse()?; let config = ClientConfig::new(token); let client = Client::new(config); let (tx, _) = broadcast::channel(1000); Ok(Self { client, basin, tx: Arc::new(tx), last_seq: Arc::new(RwLock::new(HashMap::new())), tail_tasks: Arc::new(RwLock::new(HashMap::new())), }) } pub fn subscribe(&self) -> broadcast::Receiver { self.tx.subscribe() } pub fn client_count(&self) -> usize { self.tx.receiver_count() } pub async fn stop_tails_if_idle(&self) { if self.client_count() == 0 { let mut tasks = self.tail_tasks.write().await; for (_, handle) in tasks.drain() { handle.abort(); } } } pub async fn ensure_stream(&self, name: &str) -> Result<()> { let _ = self.client.create_basin(CreateBasinRequest::new(self.basin.clone())).await; let _ = self.client.basin_client(self.basin.clone()).create_stream(CreateStreamRequest::new(name)).await; Ok(()) } pub async fn append(&self, stream: &str, event: &IrcEvent) -> Result<()> { let stream_client = self.client.basin_client(self.basin.clone()).stream_client(stream); let data = serde_json::to_vec(event)?; let record = AppendRecord::new(data)?; let batch = AppendRecordBatch::try_from_iter([record]) .map_err(|(_, _)| anyhow::anyhow!("batch creation failed"))?; stream_client.append(AppendInput::new(batch)).await?; let _ = self.tx.send(event.clone()); Ok(()) } pub async fn read(&self, stream: &str, offset: u64, limit: usize) -> Result> { let stream_client = self.client.basin_client(self.basin.clone()).stream_client(stream); let req = ReadRequest::new(ReadStart::SeqNum(offset)) .with_limit(ReadLimit::new().with_count(limit as u64)); let output = stream_client.read(req).await?; let ReadOutput::Batch(batch) = output else { return Ok(vec![]) }; Ok(batch.records.iter() .filter_map(|r| serde_json::from_slice::(&r.body).ok()) .collect()) } pub async fn list_streams(&self) -> Result> { let resp = self.client.basin_client(self.basin.clone()) .list_streams(ListStreamsRequest::new()).await?; Ok(resp.streams.into_iter().map(|s| s.name).collect()) } pub async fn ensure_tail(&self, stream: &str) -> bool { let mut tasks = self.tail_tasks.write().await; tasks.retain(|_, h| !h.is_finished()); if tasks.contains_key(stream) { return false; } let stream_name = stream.to_string(); let store_clone = Self { client: self.client.clone(), basin: self.basin.clone(), tx: self.tx.clone(), last_seq: self.last_seq.clone(), tail_tasks: self.tail_tasks.clone(), }; let handle = tokio::spawn(async move { loop { if let Err(e) = store_clone.tail_impl(&stream_name).await { eprintln!("tail {}: {}", stream_name, e); tokio::time::sleep(Duration::from_secs(5)).await; } } }); tasks.insert(stream.to_string(), handle); true } async fn tail_impl(&self, stream: &str) -> Result<()> { let stream_client = self.client.basin_client(self.basin.clone()).stream_client(stream); let start_seq = self.last_seq.read().await.get(stream).copied().unwrap_or(0); let mut stream_reader = stream_client.read_session(ReadSessionRequest::new(ReadStart::SeqNum(start_seq))).await?; loop { let result = match timeout(Duration::from_secs(5), stream_reader.next()).await { Ok(Some(r)) => r, Ok(None) => return Ok(()), Err(_) => { tokio::time::sleep(Duration::from_millis(100)).await; continue; } }; let batch = match result? { ReadOutput::Batch(b) if !b.records.is_empty() => b, _ => { tokio::time::sleep(Duration::from_millis(100)).await; continue; } }; let mut last_seq = 0u64; let last_known = self.last_seq.read().await.get(stream).copied().unwrap_or(0); for record in batch.records { if record.seq_num <= last_known { continue; } last_seq = record.seq_num; if let Ok(event) = serde_json::from_slice::(&record.body) { let _ = self.tx.send(event); } } if last_seq > 0 { self.last_seq.write().await.insert(stream.to_string(), last_seq); } } } }