First commit in rust

This commit is contained in:
2026-02-19 13:51:07 +08:00
commit c37ca9ba52
73 changed files with 9737 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
[package]
name = "backchannel-server"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "backchannel-server"
path = "src/main.rs"
[dependencies]
backchannel-common = { path = "../backchannel-common" }
serde = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
base64 = { workspace = true }
rand = { workspace = true }
argon2 = { workspace = true }
ed25519-dalek = { workspace = true }
tokio = { workspace = true }
tokio-tungstenite = { workspace = true }
sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio-rustls", "macros", "migrate", "chrono", "uuid"] }
jsonwebtoken = "9"
rustls = { version = "0.23", features = ["ring"] }
rustls-pemfile = "2"
tokio-rustls = "0.26"
futures-util = "0.3"
axum = "0.7"
rust-embed = "8"
mime_guess = "2"

View File

@@ -0,0 +1,2 @@
pub mod password;
pub mod token;

View File

@@ -0,0 +1,31 @@
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
Argon2,
};
use crate::error::{Result, ServerError};
/// Hash a plaintext password using Argon2id.
///
/// The returned string is the PHC-format encoded hash, suitable for storage in
/// the `users.password_hash` column. Includes the salt, so no separate salt
/// storage is needed.
pub fn hash(password: &str) -> Result<String> {
let salt = SaltString::generate(&mut OsRng);
let hash = Argon2::default()
.hash_password(password.as_bytes(), &salt)?
.to_string();
Ok(hash)
}
/// Verify a plaintext password against a stored PHC-format hash.
///
/// Returns `Ok(())` on success, `Err(ServerError::Unauthorized)` on mismatch.
pub fn verify(password: &str, stored_hash: &str) -> Result<()> {
let parsed = PasswordHash::new(stored_hash)
.map_err(|e| ServerError::PasswordHash(e.to_string()))?;
Argon2::default()
.verify_password(password.as_bytes(), &parsed)
.map_err(|_| ServerError::Unauthorized)
}

View File

@@ -0,0 +1,110 @@
use chrono::Utc;
use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::error::{Result, ServerError};
/// Claims embedded in every BackChannel JWT.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
/// Subject: user UUID as a string.
pub sub: String,
pub username: String,
/// JWT ID — stored in the `sessions` table for revocation support.
pub jti: String,
/// Issued-at (Unix seconds).
pub iat: i64,
/// Expiry (Unix seconds).
pub exp: i64,
}
impl Claims {
pub fn user_id(&self) -> Result<Uuid> {
Uuid::parse_str(&self.sub).map_err(|e| ServerError::Internal(e.to_string()))
}
}
const TOKEN_TTL_SECS: i64 = 7 * 24 * 3600; // 7 days
/// Issue a signed JWT and persist its JTI in the `sessions` table.
pub async fn issue(
pool: &SqlitePool,
user_id: Uuid,
username: &str,
secret: &[u8],
) -> Result<String> {
let now = Utc::now().timestamp();
let jti = Uuid::new_v4().to_string();
let claims = Claims {
sub: user_id.to_string(),
username: username.to_string(),
jti: jti.clone(),
iat: now,
exp: now + TOKEN_TTL_SECS,
};
let token = encode(
&Header::default(), // HS256
&claims,
&EncodingKey::from_secret(secret),
)?;
// Persist JTI for revocation checks.
let expires_at = Utc::now()
.checked_add_signed(chrono::Duration::seconds(TOKEN_TTL_SECS))
.unwrap_or_else(Utc::now)
.to_rfc3339();
sqlx::query(
"INSERT INTO sessions (jti, user_id, expires_at) VALUES (?, ?, ?)",
)
.bind(&jti)
.bind(user_id.to_string())
.bind(expires_at)
.execute(pool)
.await?;
Ok(token)
}
/// Decode and validate a JWT, then check the `sessions` table for revocation.
pub async fn validate(pool: &SqlitePool, token: &str, secret: &[u8]) -> Result<Claims> {
let mut validation = Validation::new(Algorithm::HS256);
validation.validate_exp = true;
let token_data = decode::<Claims>(
token,
&DecodingKey::from_secret(secret),
&validation,
)
.map_err(|_| ServerError::Unauthorized)?;
let claims = token_data.claims;
// Check revocation.
let row: Option<(i64,)> =
sqlx::query_as("SELECT revoked FROM sessions WHERE jti = ?")
.bind(&claims.jti)
.fetch_optional(pool)
.await?;
match row {
None => return Err(ServerError::Unauthorized), // JTI unknown
Some((revoked,)) if revoked != 0 => return Err(ServerError::Unauthorized),
_ => {}
}
Ok(claims)
}
/// Mark a JWT as revoked (logout / forced expiry).
pub async fn revoke(pool: &SqlitePool, jti: &str) -> Result<()> {
sqlx::query("UPDATE sessions SET revoked = 1 WHERE jti = ?")
.bind(jti)
.execute(pool)
.await?;
Ok(())
}

View File

@@ -0,0 +1,84 @@
use anyhow::Result;
/// Runtime configuration for the BackChannel server.
/// Loaded from environment variables with sensible development defaults.
pub struct ServerConfig {
/// TCP address to bind, e.g. `0.0.0.0:7777`
pub bind_addr: String,
/// HTTP address for the embedded web UI, e.g. `0.0.0.0:8080`
pub http_bind_addr: String,
/// SQLite connection string, e.g. `sqlite:backchannel.db`
pub db_url: String,
/// Raw bytes for HMAC-SHA256 JWT signing. In production, set
/// `BC_JWT_SECRET` to a long, random hex string.
pub jwt_secret: Vec<u8>,
/// Enable TLS on the WebSocket listener. Requires `tls_cert_path`
/// and `tls_key_path` when true.
pub tls_enabled: bool,
/// Path to the PEM-encoded TLS certificate chain.
pub tls_cert_path: Option<String>,
/// Path to the PEM-encoded private key.
pub tls_key_path: Option<String>,
}
impl ServerConfig {
/// Load configuration from environment variables.
///
/// | Variable | Default |
/// |------------------|-----------------------------|
/// | `BC_BIND_ADDR` | `0.0.0.0:7777` |
/// | `BC_HTTP_ADDR` | `0.0.0.0:8080` |
/// | `BC_DB_URL` | `sqlite:backchannel.db` |
/// | `BC_JWT_SECRET` | *insecure dev placeholder* |
/// | `BC_TLS` | `false` |
/// | `BC_TLS_CERT` | *(none)* |
/// | `BC_TLS_KEY` | *(none)* |
pub fn from_env() -> Result<Self> {
let bind_addr = std::env::var("BC_BIND_ADDR")
.unwrap_or_else(|_| "0.0.0.0:7777".into());
let http_bind_addr = std::env::var("BC_HTTP_ADDR")
.unwrap_or_else(|_| "0.0.0.0:8080".into());
let db_url = std::env::var("BC_DB_URL")
.unwrap_or_else(|_| "sqlite:backchannel.db".into());
let jwt_secret = match std::env::var("BC_JWT_SECRET") {
Ok(s) => s.into_bytes(),
Err(_) => {
tracing::warn!(
"BC_JWT_SECRET not set — using insecure dev placeholder. \
Set this env var before deploying."
);
b"CHANGE-ME-dev-only-jwt-secret-32b".to_vec()
}
};
let tls_enabled = std::env::var("BC_TLS")
.map(|v| v.eq_ignore_ascii_case("true") || v == "1")
.unwrap_or(false);
let tls_cert_path = std::env::var("BC_TLS_CERT").ok();
let tls_key_path = std::env::var("BC_TLS_KEY").ok();
if tls_enabled {
anyhow::ensure!(tls_cert_path.is_some(), "BC_TLS_CERT must be set when BC_TLS=true");
anyhow::ensure!(tls_key_path.is_some(), "BC_TLS_KEY must be set when BC_TLS=true");
}
Ok(Self {
bind_addr,
http_bind_addr,
db_url,
jwt_secret,
tls_enabled,
tls_cert_path,
tls_key_path,
})
}
}

View File

@@ -0,0 +1,78 @@
use chrono::Utc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::error::{Result, ServerError};
#[derive(Debug, sqlx::FromRow)]
pub struct ChannelRow {
pub id: String,
pub name: String,
pub topic: Option<String>,
pub created_by: String,
pub created_at: String,
}
impl ChannelRow {
pub fn uuid(&self) -> Result<Uuid> {
Uuid::parse_str(&self.id).map_err(|e| ServerError::Internal(e.to_string()))
}
}
pub async fn create(
pool: &SqlitePool,
name: &str,
topic: Option<&str>,
created_by: Uuid,
) -> Result<Uuid> {
let id = Uuid::new_v4();
let now = Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO channels (id, name, topic, created_by, created_at) VALUES (?, ?, ?, ?, ?)",
)
.bind(id.to_string())
.bind(name)
.bind(topic)
.bind(created_by.to_string())
.bind(now)
.execute(pool)
.await?;
Ok(id)
}
pub async fn list(pool: &SqlitePool) -> Result<Vec<ChannelRow>> {
let rows = sqlx::query_as::<_, ChannelRow>(
"SELECT id, name, topic, created_by, created_at FROM channels ORDER BY created_at ASC",
)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub async fn find_by_id(pool: &SqlitePool, id: Uuid) -> Result<Option<ChannelRow>> {
let row = sqlx::query_as::<_, ChannelRow>(
"SELECT id, name, topic, created_by, created_at FROM channels WHERE id = ?",
)
.bind(id.to_string())
.fetch_optional(pool)
.await?;
Ok(row)
}
pub async fn delete(pool: &SqlitePool, id: Uuid) -> Result<bool> {
let result = sqlx::query("DELETE FROM channels WHERE id = ?")
.bind(id.to_string())
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn name_exists(pool: &SqlitePool, name: &str) -> Result<bool> {
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM channels WHERE name = ?")
.bind(name)
.fetch_one(pool)
.await?;
Ok(count.0 > 0)
}

View File

@@ -0,0 +1,17 @@
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::error::Result;
/// Look up the registered Ed25519 identity public key for a user.
///
/// Returns `None` if the user does not exist or has no key on record
/// (the latter shouldn't happen after registration, but is handled defensively).
pub async fn get_identity_key(pool: &SqlitePool, user_id: Uuid) -> Result<Option<String>> {
let row: Option<(String,)> =
sqlx::query_as("SELECT identity_pubkey FROM users WHERE id = ?")
.bind(user_id.to_string())
.fetch_optional(pool)
.await?;
Ok(row.map(|(k,)| k))
}

View File

@@ -0,0 +1,210 @@
use backchannel_common::protocol::server::{HistoricChannelMessage, HistoricDm};
use chrono::{DateTime, Utc};
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::error::{Result, ServerError};
#[derive(Debug, sqlx::FromRow)]
struct ChannelMessageRow {
pub id: String,
pub channel_id: String,
pub author_id: String,
pub author_username: String,
pub content: String,
pub created_at: String,
}
#[derive(Debug, sqlx::FromRow)]
struct DmRow {
pub id: String,
pub sender_id: String,
pub ciphertext: String,
pub nonce: String,
pub created_at: String,
}
pub async fn insert_channel_message(
pool: &SqlitePool,
channel_id: Uuid,
author_id: Uuid,
content: &str,
) -> Result<(Uuid, DateTime<Utc>)> {
let id = Uuid::new_v4();
let now = Utc::now();
let now_str = now.to_rfc3339();
sqlx::query(
"INSERT INTO channel_messages (id, channel_id, author_id, content, created_at)
VALUES (?, ?, ?, ?, ?)",
)
.bind(id.to_string())
.bind(channel_id.to_string())
.bind(author_id.to_string())
.bind(content)
.bind(&now_str)
.execute(pool)
.await?;
Ok((id, now))
}
pub async fn fetch_channel_history(
pool: &SqlitePool,
channel_id: Uuid,
before_message_id: Option<Uuid>,
limit: u32,
) -> Result<Vec<HistoricChannelMessage>> {
let limit = limit.min(100) as i64;
let rows: Vec<ChannelMessageRow> = if let Some(before_id) = before_message_id {
// Fetch messages older than `before_id` by joining to get its created_at
sqlx::query_as::<_, ChannelMessageRow>(
"SELECT m.id, m.channel_id, m.author_id, u.username AS author_username,
m.content, m.created_at
FROM channel_messages m
JOIN users u ON u.id = m.author_id
WHERE m.channel_id = ?
AND m.created_at < (SELECT created_at FROM channel_messages WHERE id = ?)
ORDER BY m.created_at DESC
LIMIT ?",
)
.bind(channel_id.to_string())
.bind(before_id.to_string())
.bind(limit)
.fetch_all(pool)
.await?
} else {
sqlx::query_as::<_, ChannelMessageRow>(
"SELECT m.id, m.channel_id, m.author_id, u.username AS author_username,
m.content, m.created_at
FROM channel_messages m
JOIN users u ON u.id = m.author_id
WHERE m.channel_id = ?
ORDER BY m.created_at DESC
LIMIT ?",
)
.bind(channel_id.to_string())
.bind(limit)
.fetch_all(pool)
.await?
};
let mut messages: Vec<HistoricChannelMessage> = rows
.into_iter()
.map(|r| {
let ts = r
.created_at
.parse::<DateTime<Utc>>()
.unwrap_or_else(|_| Utc::now());
Ok(HistoricChannelMessage {
message_id: Uuid::parse_str(&r.id)
.map_err(|e| ServerError::Internal(e.to_string()))?,
author_id: Uuid::parse_str(&r.author_id)
.map_err(|e| ServerError::Internal(e.to_string()))?,
author_username: r.author_username,
content: r.content,
timestamp: ts,
})
})
.collect::<Result<Vec<_>>>()?;
// Return in chronological order (oldest first).
messages.reverse();
Ok(messages)
}
pub async fn insert_dm(
pool: &SqlitePool,
sender_id: Uuid,
recipient_id: Uuid,
ciphertext: &str,
nonce: &str,
) -> Result<(Uuid, DateTime<Utc>)> {
let id = Uuid::new_v4();
let now = Utc::now();
let now_str = now.to_rfc3339();
sqlx::query(
"INSERT INTO direct_messages (id, sender_id, recipient_id, ciphertext, nonce, created_at)
VALUES (?, ?, ?, ?, ?, ?)",
)
.bind(id.to_string())
.bind(sender_id.to_string())
.bind(recipient_id.to_string())
.bind(ciphertext)
.bind(nonce)
.bind(&now_str)
.execute(pool)
.await?;
Ok((id, now))
}
pub async fn fetch_dm_history(
pool: &SqlitePool,
user_a: Uuid,
user_b: Uuid,
before_message_id: Option<Uuid>,
limit: u32,
) -> Result<Vec<HistoricDm>> {
let limit = limit.min(100) as i64;
let rows: Vec<DmRow> = if let Some(before_id) = before_message_id {
sqlx::query_as::<_, DmRow>(
"SELECT id, sender_id, ciphertext, nonce, created_at
FROM direct_messages
WHERE ((sender_id = ? AND recipient_id = ?)
OR (sender_id = ? AND recipient_id = ?))
AND created_at < (SELECT created_at FROM direct_messages WHERE id = ?)
ORDER BY created_at DESC
LIMIT ?",
)
.bind(user_a.to_string())
.bind(user_b.to_string())
.bind(user_b.to_string())
.bind(user_a.to_string())
.bind(before_id.to_string())
.bind(limit)
.fetch_all(pool)
.await?
} else {
sqlx::query_as::<_, DmRow>(
"SELECT id, sender_id, ciphertext, nonce, created_at
FROM direct_messages
WHERE (sender_id = ? AND recipient_id = ?)
OR (sender_id = ? AND recipient_id = ?)
ORDER BY created_at DESC
LIMIT ?",
)
.bind(user_a.to_string())
.bind(user_b.to_string())
.bind(user_b.to_string())
.bind(user_a.to_string())
.bind(limit)
.fetch_all(pool)
.await?
};
let mut messages: Vec<HistoricDm> = rows
.into_iter()
.map(|r| {
let ts = r
.created_at
.parse::<DateTime<Utc>>()
.unwrap_or_else(|_| Utc::now());
Ok(HistoricDm {
message_id: Uuid::parse_str(&r.id)
.map_err(|e| ServerError::Internal(e.to_string()))?,
sender_id: Uuid::parse_str(&r.sender_id)
.map_err(|e| ServerError::Internal(e.to_string()))?,
ciphertext: r.ciphertext,
nonce: r.nonce,
timestamp: ts,
})
})
.collect::<Result<Vec<_>>>()?;
messages.reverse();
Ok(messages)
}

View File

@@ -0,0 +1,85 @@
-- ── Users ─────────────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS users (
id TEXT NOT NULL PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
identity_pubkey TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
-- ── Roles ─────────────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS roles (
id TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
permissions INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
-- Seed built-in roles.
-- admin: all bits set (i64::MAX in SQLite integer storage).
-- member: READ_MESSAGES (bit 0) | SEND_MESSAGES (bit 1) = 3
INSERT OR IGNORE INTO roles (id, name, permissions) VALUES
('00000000-0000-0000-0000-000000000001', 'admin', 9223372036854775807),
('00000000-0000-0000-0000-000000000002', 'member', 3);
-- ── User ↔ Role membership ─────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS user_roles (
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role_id TEXT NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
assigned_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
PRIMARY KEY (user_id, role_id)
);
-- ── Channels ───────────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS channels (
id TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
topic TEXT,
created_by TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
-- Seed a default channel owned by a system placeholder user.
-- The created_by value references no real user; foreign key is intentionally
-- left loose here since the system user is never inserted.
INSERT OR IGNORE INTO channels (id, name, created_by) VALUES
('00000000-0000-0000-0000-000000000010', 'general', '00000000-0000-0000-0000-000000000000');
-- ── Channel messages ───────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS channel_messages (
id TEXT NOT NULL PRIMARY KEY,
channel_id TEXT NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
author_id TEXT NOT NULL REFERENCES users(id),
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_channel_messages_channel_created
ON channel_messages (channel_id, created_at DESC);
-- ── Direct messages ────────────────────────────────────────────────────────────
-- Ciphertext is stored opaquely. The server cannot decrypt DM content.
CREATE TABLE IF NOT EXISTS direct_messages (
id TEXT NOT NULL PRIMARY KEY,
sender_id TEXT NOT NULL REFERENCES users(id),
recipient_id TEXT NOT NULL REFERENCES users(id),
ciphertext TEXT NOT NULL,
nonce TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_dm_pair_created
ON direct_messages (sender_id, recipient_id, created_at DESC);
-- ── Sessions ───────────────────────────────────────────────────────────────────
-- JWT IDs stored here enable soft revocation (logout / forced expiry).
CREATE TABLE IF NOT EXISTS sessions (
jti TEXT NOT NULL PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
expires_at TEXT NOT NULL,
revoked INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions (user_id);

View File

@@ -0,0 +1,5 @@
pub mod channels;
pub mod keys;
pub mod messages;
pub mod roles;
pub mod users;

View File

@@ -0,0 +1,86 @@
use chrono::Utc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::error::{Result, ServerError};
#[derive(Debug, sqlx::FromRow)]
pub struct RoleRow {
pub id: String,
pub name: String,
pub permissions: i64,
pub created_at: String,
}
impl RoleRow {
pub fn uuid(&self) -> Result<Uuid> {
Uuid::parse_str(&self.id).map_err(|e| ServerError::Internal(e.to_string()))
}
/// Returns the permissions bitmask as `u64` (SQLite stores it as `i64`).
pub fn permissions_u64(&self) -> u64 {
self.permissions as u64
}
}
pub async fn create(pool: &SqlitePool, name: &str, permissions: u64) -> Result<Uuid> {
let id = Uuid::new_v4();
let now = Utc::now().to_rfc3339();
sqlx::query("INSERT INTO roles (id, name, permissions, created_at) VALUES (?, ?, ?, ?)")
.bind(id.to_string())
.bind(name)
.bind(permissions as i64)
.bind(now)
.execute(pool)
.await?;
Ok(id)
}
pub async fn find_by_id(pool: &SqlitePool, id: Uuid) -> Result<Option<RoleRow>> {
let row = sqlx::query_as::<_, RoleRow>(
"SELECT id, name, permissions, created_at FROM roles WHERE id = ?",
)
.bind(id.to_string())
.fetch_optional(pool)
.await?;
Ok(row)
}
pub async fn assign(pool: &SqlitePool, user_id: Uuid, role_id: Uuid) -> Result<()> {
let now = Utc::now().to_rfc3339();
sqlx::query(
"INSERT OR IGNORE INTO user_roles (user_id, role_id, assigned_at) VALUES (?, ?, ?)",
)
.bind(user_id.to_string())
.bind(role_id.to_string())
.bind(now)
.execute(pool)
.await?;
Ok(())
}
pub async fn revoke(pool: &SqlitePool, user_id: Uuid, role_id: Uuid) -> Result<()> {
sqlx::query("DELETE FROM user_roles WHERE user_id = ? AND role_id = ?")
.bind(user_id.to_string())
.bind(role_id.to_string())
.execute(pool)
.await?;
Ok(())
}
/// Compute the effective permission bitmask for a user by OR-ing all their role permissions.
pub async fn get_user_permissions(pool: &SqlitePool, user_id: Uuid) -> Result<u64> {
let rows: Vec<(i64,)> = sqlx::query_as(
"SELECT r.permissions FROM roles r
JOIN user_roles ur ON ur.role_id = r.id
WHERE ur.user_id = ?",
)
.bind(user_id.to_string())
.fetch_all(pool)
.await?;
let perms = rows.into_iter().fold(0u64, |acc, (p,)| acc | p as u64);
Ok(perms)
}

View File

@@ -0,0 +1,115 @@
use chrono::Utc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::error::{Result, ServerError};
/// A row from the `users` table.
#[derive(Debug, sqlx::FromRow)]
pub struct UserRow {
pub id: String,
pub username: String,
pub password_hash: String,
pub identity_pubkey: String,
pub created_at: String,
pub updated_at: String,
}
impl UserRow {
pub fn uuid(&self) -> Result<Uuid> {
Uuid::parse_str(&self.id).map_err(|e| ServerError::Internal(e.to_string()))
}
}
/// Insert a new user. Returns the new row's UUID.
pub async fn create(
pool: &SqlitePool,
username: &str,
password_hash: &str,
identity_pubkey: &str,
) -> Result<Uuid> {
let id = Uuid::new_v4();
let now = Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO users (id, username, password_hash, identity_pubkey, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)",
)
.bind(id.to_string())
.bind(username)
.bind(password_hash)
.bind(identity_pubkey)
.bind(&now)
.bind(&now)
.execute(pool)
.await?;
Ok(id)
}
/// Find a user by username.
pub async fn find_by_username(pool: &SqlitePool, username: &str) -> Result<Option<UserRow>> {
let row = sqlx::query_as::<_, UserRow>(
"SELECT id, username, password_hash, identity_pubkey, created_at, updated_at
FROM users WHERE username = ?",
)
.bind(username)
.fetch_optional(pool)
.await?;
Ok(row)
}
/// Find a user by UUID.
pub async fn find_by_id(pool: &SqlitePool, id: Uuid) -> Result<Option<UserRow>> {
let row = sqlx::query_as::<_, UserRow>(
"SELECT id, username, password_hash, identity_pubkey, created_at, updated_at
FROM users WHERE id = ?",
)
.bind(id.to_string())
.fetch_optional(pool)
.await?;
Ok(row)
}
/// Update (or set for the first time) a user's Ed25519 identity public key.
/// Called on every login to allow transparent key rotation.
pub async fn update_identity_key(
pool: &SqlitePool,
user_id: Uuid,
identity_pubkey: &str,
) -> Result<()> {
let now = Utc::now().to_rfc3339();
sqlx::query("UPDATE users SET identity_pubkey = ?, updated_at = ? WHERE id = ?")
.bind(identity_pubkey)
.bind(now)
.bind(user_id.to_string())
.execute(pool)
.await?;
Ok(())
}
/// Check whether a username is already taken.
pub async fn username_exists(pool: &SqlitePool, username: &str) -> Result<bool> {
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM users WHERE username = ?")
.bind(username)
.fetch_one(pool)
.await?;
Ok(count.0 > 0)
}
/// Assign the built-in `member` role to a newly registered user.
pub async fn assign_member_role(pool: &SqlitePool, user_id: Uuid) -> Result<()> {
let now = Utc::now().to_rfc3339();
sqlx::query(
"INSERT OR IGNORE INTO user_roles (user_id, role_id, assigned_at) VALUES (?, ?, ?)",
)
.bind(user_id.to_string())
.bind("00000000-0000-0000-0000-000000000002") // member role UUID
.bind(now)
.execute(pool)
.await?;
Ok(())
}

View File

@@ -0,0 +1,59 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, ServerError>;
#[derive(Debug, Error)]
pub enum ServerError {
#[error("unauthorized")]
Unauthorized,
#[error("forbidden")]
Forbidden,
#[error("not found: {0}")]
NotFound(String),
#[error("bad request: {0}")]
BadRequest(String),
#[error("connection closed")]
ConnectionClosed,
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("JWT error: {0}")]
Jwt(#[from] jsonwebtoken::errors::Error),
#[error("password hash error: {0}")]
PasswordHash(String),
#[error("common error: {0}")]
Common(#[from] backchannel_common::BackchannelError),
#[error("internal error: {0}")]
Internal(String),
}
impl From<argon2::password_hash::Error> for ServerError {
fn from(e: argon2::password_hash::Error) -> Self {
ServerError::PasswordHash(e.to_string())
}
}
/// HTTP-style error code for `ServerMessage::Error`.
impl ServerError {
pub fn code(&self) -> u16 {
match self {
ServerError::Unauthorized => 401,
ServerError::Forbidden => 403,
ServerError::NotFound(_) => 404,
ServerError::BadRequest(_) => 400,
ServerError::ConnectionClosed => 503,
_ => 500,
}
}
}

View File

@@ -0,0 +1,124 @@
use std::sync::Arc;
use backchannel_common::protocol::ServerMessage;
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
use crate::auth::{password, token};
use crate::db::users;
use crate::error::{Result, ServerError};
use crate::state::{AppState, ChannelBroadcast};
use crate::ws::session::Session;
pub async fn handle_login(
session: &mut Session,
state: &Arc<AppState>,
username: String,
password_plain: String,
identity_pubkey: String,
) -> Result<()> {
let user = users::find_by_username(&state.db, &username)
.await?
.ok_or(ServerError::Unauthorized)?;
password::verify(&password_plain, &user.password_hash)?;
let user_id = user.uuid()?;
// Update identity key on every login to allow transparent key rotation.
users::update_identity_key(&state.db, user_id, &identity_pubkey).await?;
let jwt = token::issue(&state.db, user_id, &username, &state.jwt_secret).await?;
let jti = extract_jti(&jwt, &state.jwt_secret)?;
session.user_id = Some(user_id);
session.username = Some(username.clone());
session.jti = Some(jti);
state.register_session(session.conn_id, user_id, username.clone(), session.tx.clone()).await;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::UserOnline { user_id, username: username.clone() },
});
session.send(ServerMessage::AuthSuccess { user_id, username, session_token: jwt })
}
pub async fn handle_register(
session: &mut Session,
state: &Arc<AppState>,
username: String,
password_plain: String,
identity_pubkey: String,
) -> Result<()> {
if username.len() < 2 || username.len() > 32 {
return Err(ServerError::BadRequest("Username must be 232 characters".into()));
}
if password_plain.len() < 8 {
return Err(ServerError::BadRequest("Password must be at least 8 characters".into()));
}
if users::username_exists(&state.db, &username).await? {
return Err(ServerError::BadRequest("Username already taken".into()));
}
let hash = password::hash(&password_plain)?;
let user_id = users::create(&state.db, &username, &hash, &identity_pubkey).await?;
users::assign_member_role(&state.db, user_id).await?;
let jwt = token::issue(&state.db, user_id, &username, &state.jwt_secret).await?;
let jti = extract_jti(&jwt, &state.jwt_secret)?;
session.user_id = Some(user_id);
session.username = Some(username.clone());
session.jti = Some(jti);
state.register_session(session.conn_id, user_id, username.clone(), session.tx.clone()).await;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::UserOnline { user_id, username: username.clone() },
});
session.send(ServerMessage::AuthSuccess { user_id, username, session_token: jwt })
}
pub async fn handle_resume(
session: &mut Session,
state: &Arc<AppState>,
token_str: String,
) -> Result<()> {
let claims = token::validate(&state.db, &token_str, &state.jwt_secret).await?;
let user_id = claims.user_id()?;
session.user_id = Some(user_id);
session.username = Some(claims.username.clone());
session.jti = Some(claims.jti.clone());
state.register_session(session.conn_id, user_id, claims.username.clone(), session.tx.clone()).await;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::UserOnline { user_id, username: claims.username.clone() },
});
session.send(ServerMessage::AuthSuccess {
user_id,
username: claims.username,
session_token: token_str,
})
}
pub async fn handle_logout(session: &mut Session, state: &Arc<AppState>) -> Result<()> {
session.require_auth()?;
if let Some(jti) = session.jti.take() {
token::revoke(&state.db, &jti).await?;
}
session.user_id = None;
session.username = None;
Ok(())
}
/// Extract the JTI claim from a JWT without re-running expiry validation.
/// Used immediately after `token::issue` to store the JTI in the session.
fn extract_jti(jwt: &str, secret: &[u8]) -> Result<String> {
let mut v = Validation::new(Algorithm::HS256);
v.validate_exp = false;
let data = decode::<token::Claims>(jwt, &DecodingKey::from_secret(secret), &v)?;
Ok(data.claims.jti)
}

View File

@@ -0,0 +1,107 @@
use std::sync::Arc;
use backchannel_common::protocol::ServerMessage;
use backchannel_common::types::{has_permission, PermissionFlags};
use uuid::Uuid;
use crate::db::{channels, messages, roles};
use crate::error::{Result, ServerError};
use crate::state::{AppState, ChannelBroadcast};
use crate::ws::session::Session;
pub async fn handle_send(
session: &Session,
state: &Arc<AppState>,
channel_id: Uuid,
content: String,
) -> Result<()> {
let user_id = session.require_auth()?;
let username = session.require_username()?.to_string();
channels::find_by_id(&state.db, channel_id)
.await?
.ok_or_else(|| ServerError::NotFound("Channel not found".into()))?;
let perms = roles::get_user_permissions(&state.db, user_id).await?;
if !has_permission(perms, PermissionFlags::SEND_MESSAGES) {
return Err(ServerError::Forbidden);
}
let (message_id, timestamp) =
messages::insert_channel_message(&state.db, channel_id, user_id, &content).await?;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::ChannelMessage {
message_id,
channel_id,
author_id: user_id,
author_username: username,
content,
timestamp,
},
});
Ok(())
}
pub async fn handle_history(
session: &Session,
state: &Arc<AppState>,
channel_id: Uuid,
before_message_id: Option<Uuid>,
limit: u32,
) -> Result<()> {
let user_id = session.require_auth()?;
let perms = roles::get_user_permissions(&state.db, user_id).await?;
if !has_permission(perms, PermissionFlags::READ_MESSAGES) {
return Err(ServerError::Forbidden);
}
let msgs = messages::fetch_channel_history(&state.db, channel_id, before_message_id, limit).await?;
session.send(ServerMessage::ChannelHistory { channel_id, messages: msgs })
}
pub async fn handle_create(
session: &Session,
state: &Arc<AppState>,
name: String,
topic: Option<String>,
) -> Result<()> {
let user_id = session.require_auth()?;
let perms = roles::get_user_permissions(&state.db, user_id).await?;
if !has_permission(perms, PermissionFlags::MANAGE_CHANNELS) {
return Err(ServerError::Forbidden);
}
if channels::name_exists(&state.db, &name).await? {
return Err(ServerError::BadRequest("Channel name already exists".into()));
}
let channel_id = channels::create(&state.db, &name, topic.as_deref(), user_id).await?;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::ChannelCreated { channel_id, name, topic },
});
Ok(())
}
pub async fn handle_delete(
session: &Session,
state: &Arc<AppState>,
channel_id: Uuid,
) -> Result<()> {
let user_id = session.require_auth()?;
let perms = roles::get_user_permissions(&state.db, user_id).await?;
if !has_permission(perms, PermissionFlags::MANAGE_CHANNELS) {
return Err(ServerError::Forbidden);
}
if !channels::delete(&state.db, channel_id).await? {
return Err(ServerError::NotFound("Channel not found".into()));
}
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::ChannelDeleted { channel_id },
});
Ok(())
}

View File

@@ -0,0 +1,115 @@
use std::sync::Arc;
use backchannel_common::protocol::ServerMessage;
use uuid::Uuid;
use crate::db::messages;
use crate::error::Result;
use crate::state::AppState;
use crate::ws::session::Session;
pub async fn handle_init_key_exchange(
session: &Session,
state: &Arc<AppState>,
recipient_id: Uuid,
sender_ephemeral_pubkey: String,
) -> Result<()> {
let initiator_id = session.require_auth()?;
let initiator_username = session.require_username()?.to_string();
// Store pending exchange in AppState (cleared once recipient accepts).
state
.pending_key_exchanges
.lock()
.await
.insert((initiator_id, recipient_id), sender_ephemeral_pubkey.clone());
// Relay to recipient if they're currently connected.
state
.send_to_user(
recipient_id,
&ServerMessage::DmKeyExchangeRequest {
initiator_id,
initiator_username,
sender_ephemeral_pubkey,
},
)
.await;
Ok(())
}
pub async fn handle_accept_key_exchange(
session: &Session,
state: &Arc<AppState>,
initiator_id: Uuid,
recipient_ephemeral_pubkey: String,
) -> Result<()> {
let recipient_id = session.require_auth()?;
let recipient_username = session.require_username()?.to_string();
// Clear the pending exchange.
state
.pending_key_exchanges
.lock()
.await
.remove(&(initiator_id, recipient_id));
// Relay response back to initiator.
state
.send_to_user(
initiator_id,
&ServerMessage::DmKeyExchangeResponse {
recipient_id,
recipient_username,
recipient_ephemeral_pubkey,
},
)
.await;
Ok(())
}
pub async fn handle_send_dm(
session: &Session,
state: &Arc<AppState>,
recipient_id: Uuid,
ciphertext: String,
nonce: String,
) -> Result<()> {
let sender_id = session.require_auth()?;
let sender_username = session.require_username()?.to_string();
let (message_id, timestamp) =
messages::insert_dm(&state.db, sender_id, recipient_id, &ciphertext, &nonce).await?;
let msg = ServerMessage::DirectMessage {
message_id,
sender_id,
sender_username,
ciphertext,
nonce,
timestamp,
};
// Deliver to recipient and to sender's other sessions (multi-client sync).
state.send_to_user(recipient_id, &msg).await;
state.send_to_user(sender_id, &msg).await;
Ok(())
}
pub async fn handle_dm_history(
session: &Session,
state: &Arc<AppState>,
peer_id: Uuid,
before_message_id: Option<Uuid>,
limit: u32,
) -> Result<()> {
let user_id = session.require_auth()?;
let history =
messages::fetch_dm_history(&state.db, user_id, peer_id, before_message_id, limit).await?;
session.send(ServerMessage::DmHistory { peer_id, messages: history })
}

View File

@@ -0,0 +1,4 @@
pub mod auth;
pub mod channels;
pub mod dms;
pub mod roles;

View File

@@ -0,0 +1,87 @@
use std::sync::Arc;
use backchannel_common::protocol::ServerMessage;
use backchannel_common::types::{has_permission, PermissionFlags};
use uuid::Uuid;
use crate::db::{keys, roles};
use crate::error::{Result, ServerError};
use crate::state::{AppState, ChannelBroadcast};
use crate::ws::session::Session;
pub async fn handle_create(
session: &Session,
state: &Arc<AppState>,
name: String,
permissions: u64,
) -> Result<()> {
let user_id = session.require_auth()?;
let perms = roles::get_user_permissions(&state.db, user_id).await?;
if !has_permission(perms, PermissionFlags::MANAGE_ROLES) {
return Err(ServerError::Forbidden);
}
let role_id = roles::create(&state.db, &name, permissions).await?;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::RoleCreated { role_id, name, permissions },
});
Ok(())
}
pub async fn handle_assign(
session: &Session,
state: &Arc<AppState>,
user_id: Uuid,
role_id: Uuid,
) -> Result<()> {
let caller_id = session.require_auth()?;
let perms = roles::get_user_permissions(&state.db, caller_id).await?;
if !has_permission(perms, PermissionFlags::MANAGE_ROLES) {
return Err(ServerError::Forbidden);
}
roles::find_by_id(&state.db, role_id)
.await?
.ok_or_else(|| ServerError::NotFound("Role not found".into()))?;
roles::assign(&state.db, user_id, role_id).await?;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::RoleAssigned { user_id, role_id },
});
Ok(())
}
pub async fn handle_revoke(
session: &Session,
state: &Arc<AppState>,
user_id: Uuid,
role_id: Uuid,
) -> Result<()> {
let caller_id = session.require_auth()?;
let perms = roles::get_user_permissions(&state.db, caller_id).await?;
if !has_permission(perms, PermissionFlags::MANAGE_ROLES) {
return Err(ServerError::Forbidden);
}
roles::revoke(&state.db, user_id, role_id).await?;
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::RoleRevoked { user_id, role_id },
});
Ok(())
}
pub async fn handle_query_key(
session: &Session,
state: &Arc<AppState>,
user_id: Uuid,
) -> Result<()> {
session.require_auth()?;
let pubkey = keys::get_identity_key(&state.db, user_id).await?;
session.send(ServerMessage::IdentityKeyResponse { user_id, identity_pubkey: pubkey })
}

View File

@@ -0,0 +1,3 @@
mod server;
pub use server::serve;

View File

@@ -0,0 +1,65 @@
use anyhow::{Context, Result};
use axum::body::Body;
use axum::extract::Path;
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::Router;
use mime_guess::from_path;
use rust_embed::RustEmbed;
#[derive(RustEmbed)]
#[folder = "../backchannel-web/dist"]
struct WebAssets;
pub async fn serve(bind_addr: &str) -> Result<()> {
let app = Router::new()
.route("/", get(index))
.route("/*path", get(asset_or_index));
let listener = tokio::net::TcpListener::bind(bind_addr)
.await
.with_context(|| format!("Failed to bind HTTP server to {bind_addr}"))?;
tracing::info!("BackChannel web UI listening on http://{bind_addr}");
axum::serve(listener, app).await.context("HTTP server failed")
}
async fn index() -> Response {
render_asset("index.html")
}
async fn asset_or_index(Path(path): Path<String>) -> Response {
if path.is_empty() {
return render_asset("index.html");
}
render_asset(&path)
}
fn render_asset(path: &str) -> Response {
let canonical = path.trim_start_matches('/');
if let Some(content) = WebAssets::get(canonical) {
return build_response(canonical, content.data.as_ref(), StatusCode::OK);
}
if let Some(index) = WebAssets::get("index.html") {
return build_response("index.html", index.data.as_ref(), StatusCode::OK);
}
(StatusCode::NOT_FOUND, "Web UI asset not found").into_response()
}
fn build_response(path: &str, body: &[u8], status: StatusCode) -> Response {
let mime = from_path(path).first_or_octet_stream();
let mut response = Response::new(Body::from(body.to_vec()));
*response.status_mut() = status;
response.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_str(mime.as_ref()).unwrap_or(HeaderValue::from_static("application/octet-stream")),
);
response
}

View File

@@ -0,0 +1,62 @@
mod auth;
mod config;
mod db;
mod error;
mod handlers;
mod http;
mod state;
mod ws;
use std::sync::Arc;
use anyhow::Result;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use std::str::FromStr;
use tokio::sync::{broadcast, RwLock, Mutex};
use state::AppState;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "backchannel_server=debug,info".into()),
)
.init();
let config = config::ServerConfig::from_env()?;
let opts = SqliteConnectOptions::from_str(&config.db_url)?.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(opts)
.await?;
sqlx::migrate!("./src/db/migrations").run(&pool).await?;
tracing::info!("Database migrations applied");
let (bc_tx, _) = broadcast::channel(1024);
let state = Arc::new(AppState {
sessions: RwLock::new(std::collections::HashMap::new()),
user_sessions: RwLock::new(std::collections::HashMap::new()),
pending_key_exchanges: Mutex::new(std::collections::HashMap::new()),
channel_broadcast: bc_tx,
db: pool,
jwt_secret: config.jwt_secret,
});
tokio::try_join!(
ws::listen(
Arc::clone(&state),
&config.bind_addr,
config.tls_enabled,
config.tls_cert_path,
config.tls_key_path,
),
http::serve(&config.http_bind_addr)
)?;
Ok(())
}

View File

@@ -0,0 +1,123 @@
use std::collections::HashMap;
use std::sync::Arc;
use backchannel_common::protocol::ServerMessage;
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use uuid::Uuid;
/// A handle to send WebSocket frames directly to one connection.
pub type WsSender = mpsc::UnboundedSender<WsMessage>;
/// State for a single authenticated WebSocket connection.
pub struct ConnectedSession {
pub user_id: Uuid,
pub username: String,
/// Sending on this channel pushes a frame to the client's write task.
pub tx: WsSender,
}
/// A channel event broadcast to all connected clients.
///
/// Channel messages are fanned out via a `tokio::sync::broadcast` channel
/// so that the handler never has to hold a lock while iterating sessions.
#[derive(Clone, Debug)]
pub struct ChannelBroadcast {
pub message: ServerMessage,
}
/// The shared application state. Wrapped in `Arc` and passed to every task.
pub struct AppState {
/// All live WebSocket sessions, keyed by a per-connection UUID.
/// One user may hold multiple connections.
pub sessions: RwLock<HashMap<Uuid, ConnectedSession>>,
/// Reverse index: user_id → set of connection UUIDs.
/// Allows sending to all clients of a specific user (e.g. for DMs).
pub user_sessions: RwLock<HashMap<Uuid, Vec<Uuid>>>,
/// In-flight DM key-exchange requests.
/// Key: (initiator_user_id, recipient_user_id)
/// Value: initiator's base64 X25519 ephemeral public key.
/// Cleared once the recipient accepts the exchange.
pub pending_key_exchanges: Mutex<HashMap<(Uuid, Uuid), String>>,
/// Server-wide broadcast for channel events. Each connection handler
/// calls `.subscribe()` to get its own receiver.
pub channel_broadcast: broadcast::Sender<ChannelBroadcast>,
/// SQLite connection pool.
pub db: sqlx::SqlitePool,
/// Bytes used as the HMAC secret for JWT signing.
pub jwt_secret: Vec<u8>,
}
impl AppState {
/// Look up all active sender handles for a given user ID.
/// Used when routing DMs or presence events to a specific user.
pub async fn senders_for_user(&self, user_id: Uuid) -> Vec<WsSender> {
let sessions = self.sessions.read().await;
let user_sessions = self.user_sessions.read().await;
user_sessions
.get(&user_id)
.map(|conn_ids| {
conn_ids
.iter()
.filter_map(|id| sessions.get(id).map(|s| s.tx.clone()))
.collect()
})
.unwrap_or_default()
}
/// Send a `ServerMessage` to every connection belonging to `user_id`.
pub async fn send_to_user(&self, user_id: Uuid, msg: &ServerMessage) {
if let Ok(json) = serde_json::to_string(msg) {
for tx in self.senders_for_user(user_id).await {
let _ = tx.send(WsMessage::Text(json.clone()));
}
}
}
/// Register a new connection in the session maps.
pub async fn register_session(
self: &Arc<Self>,
conn_id: Uuid,
user_id: Uuid,
username: String,
tx: WsSender,
) {
let session = ConnectedSession { user_id, username, tx };
self.sessions.write().await.insert(conn_id, session);
self.user_sessions
.write()
.await
.entry(user_id)
.or_default()
.push(conn_id);
}
/// Remove a connection from the session maps.
pub async fn remove_session(self: &Arc<Self>, conn_id: Uuid) -> Option<ConnectedSession> {
let session = self.sessions.write().await.remove(&conn_id)?;
let mut user_sessions = self.user_sessions.write().await;
if let Some(ids) = user_sessions.get_mut(&session.user_id) {
ids.retain(|id| id != &conn_id);
if ids.is_empty() {
user_sessions.remove(&session.user_id);
}
}
Some(session)
}
/// Returns true if the given user has at least one active connection.
pub async fn is_user_online(&self, user_id: Uuid) -> bool {
self.user_sessions
.read()
.await
.get(&user_id)
.map(|v| !v.is_empty())
.unwrap_or(false)
}
}

View File

@@ -0,0 +1,179 @@
use std::io::BufReader;
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::{Context, Result};
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, mpsc};
use tokio_tungstenite::{accept_async, tungstenite::Message as WsMessage, WebSocketStream};
use backchannel_common::protocol::{ClientMessage, ServerMessage};
use crate::state::{AppState, ChannelBroadcast};
use crate::ws::{router, session::Session};
/// Bind the TCP listener and accept connections in a loop.
pub async fn run(
state: Arc<AppState>,
bind_addr: &str,
tls_enabled: bool,
tls_cert_path: Option<String>,
tls_key_path: Option<String>,
) -> Result<()> {
let listener = TcpListener::bind(bind_addr)
.await
.with_context(|| format!("Failed to bind to {}", bind_addr))?;
tracing::info!("BackChannel server listening on ws{}://{}", if tls_enabled { "s" } else { "" }, bind_addr);
if tls_enabled {
let cert_path = tls_cert_path.expect("tls_cert_path required when TLS enabled");
let key_path = tls_key_path.expect("tls_key_path required when TLS enabled");
let acceptor = build_tls_acceptor(&cert_path, &key_path)?;
loop {
let (stream, addr) = listener.accept().await?;
let state = Arc::clone(&state);
let acceptor = acceptor.clone();
tokio::spawn(async move {
match acceptor.accept(stream).await {
Ok(tls_stream) => match accept_async(tls_stream).await {
Ok(ws) => handle_ws(ws, state, addr).await,
Err(e) => tracing::debug!("WS handshake failed from {}: {}", addr, e),
},
Err(e) => tracing::debug!("TLS handshake failed from {}: {}", addr, e),
}
});
}
} else {
loop {
let (stream, addr) = listener.accept().await?;
let state = Arc::clone(&state);
tokio::spawn(async move {
match accept_async(stream).await {
Ok(ws) => handle_ws(ws, state, addr).await,
Err(e) => tracing::debug!("WS handshake failed from {}: {}", addr, e),
}
});
}
}
}
/// Per-connection WebSocket handler — generic over the underlying stream type
/// so it works with both plain TCP and TLS.
async fn handle_ws<S>(ws_stream: WebSocketStream<S>, state: Arc<AppState>, addr: SocketAddr)
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
tracing::debug!("Connection established from {}", addr);
let (mut ws_sink, mut ws_source) = ws_stream.split();
// Per-connection mpsc: any task can push to this sender; the write task
// drains it to the WebSocket sink.
let (tx, mut rx) = mpsc::unbounded_channel::<WsMessage>();
let mut session = Session::new(tx);
let conn_id = session.conn_id;
// Subscribe to the server-wide channel broadcast before the loop starts.
let mut bc_rx = state.channel_broadcast.subscribe();
// Write task: merges direct messages (mpsc) + broadcast events → WS sink.
let write_task = tokio::spawn(async move {
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some(m) => { if ws_sink.send(m).await.is_err() { break; } }
None => break,
}
}
event = bc_rx.recv() => {
match event {
Ok(ChannelBroadcast { message }) => {
if let Ok(json) = serde_json::to_string(&message) {
if ws_sink.send(WsMessage::Text(json)).await.is_err() { break; }
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("conn {} lagged by {} broadcast messages", conn_id, n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
});
// Read loop: receive, deserialize, dispatch.
while let Some(msg_result) = ws_source.next().await {
match msg_result {
Ok(WsMessage::Text(text)) => {
match serde_json::from_str::<ClientMessage>(&text) {
Ok(client_msg) => {
if let Err(e) = router::route(client_msg, &mut session, &state).await {
let _ = session.send(ServerMessage::Error {
code: e.code(),
message: e.to_string(),
});
}
}
Err(_) => {
let _ = session.send(ServerMessage::Error {
code: 400,
message: "Invalid message format".into(),
});
}
}
}
Ok(WsMessage::Close(_)) | Err(_) => break,
Ok(_) => {} // Ignore Ping/Pong/Binary frames.
}
}
tracing::debug!("Connection closed from {}", addr);
// Clean up session maps and broadcast offline status.
if let Some(removed) = state.remove_session(conn_id).await {
if !state.is_user_online(removed.user_id).await {
let _ = state.channel_broadcast.send(ChannelBroadcast {
message: ServerMessage::UserOffline {
user_id: removed.user_id,
username: removed.username,
},
});
}
}
write_task.abort();
}
/// Build a `tokio_rustls::TlsAcceptor` from PEM cert and key files.
fn build_tls_acceptor(cert_path: &str, key_path: &str) -> Result<tokio_rustls::TlsAcceptor> {
use rustls::ServerConfig;
use rustls_pemfile::{certs, private_key};
use std::fs::File;
let cert_file = File::open(cert_path)
.with_context(|| format!("Cannot open TLS cert: {}", cert_path))?;
let key_file = File::open(key_path)
.with_context(|| format!("Cannot open TLS key: {}", key_path))?;
let certs: Vec<_> = certs(&mut BufReader::new(cert_file))
.collect::<std::result::Result<_, _>>()
.context("Failed to parse TLS certificates")?;
let key = private_key(&mut BufReader::new(key_file))
.context("Failed to parse TLS private key")?
.context("No private key found in key file")?;
let config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.context("Invalid TLS certificate/key combination")?;
Ok(tokio_rustls::TlsAcceptor::from(Arc::new(config)))
}

View File

@@ -0,0 +1,18 @@
pub mod handler;
pub mod router;
pub mod session;
use std::sync::Arc;
use anyhow::Result;
use crate::state::AppState;
/// Bind the TCP listener and accept WebSocket connections.
pub async fn listen(
state: Arc<AppState>,
bind_addr: &str,
tls_enabled: bool,
tls_cert_path: Option<String>,
tls_key_path: Option<String>,
) -> Result<()> {
handler::run(state, bind_addr, tls_enabled, tls_cert_path, tls_key_path).await
}

View File

@@ -0,0 +1,64 @@
use std::sync::Arc;
use backchannel_common::protocol::{ClientMessage, ServerMessage};
use crate::error::Result;
use crate::handlers;
use crate::state::AppState;
use crate::ws::session::Session;
/// Dispatch a `ClientMessage` to the appropriate handler.
pub async fn route(msg: ClientMessage, session: &mut Session, state: &Arc<AppState>) -> Result<()> {
use ClientMessage::*;
match msg {
Login { username, password, identity_pubkey } => {
handlers::auth::handle_login(session, state, username, password, identity_pubkey).await
}
Register { username, password, identity_pubkey } => {
handlers::auth::handle_register(session, state, username, password, identity_pubkey).await
}
ResumeSession { token } => {
handlers::auth::handle_resume(session, state, token).await
}
Logout => {
handlers::auth::handle_logout(session, state).await
}
SendChannelMessage { channel_id, content } => {
handlers::channels::handle_send(session, state, channel_id, content).await
}
FetchChannelHistory { channel_id, before_message_id, limit } => {
handlers::channels::handle_history(session, state, channel_id, before_message_id, limit).await
}
CreateChannel { name, topic } => {
handlers::channels::handle_create(session, state, name, topic).await
}
DeleteChannel { channel_id } => {
handlers::channels::handle_delete(session, state, channel_id).await
}
InitDmKeyExchange { recipient_id, sender_ephemeral_pubkey } => {
handlers::dms::handle_init_key_exchange(session, state, recipient_id, sender_ephemeral_pubkey).await
}
AcceptDmKeyExchange { initiator_id, recipient_ephemeral_pubkey } => {
handlers::dms::handle_accept_key_exchange(session, state, initiator_id, recipient_ephemeral_pubkey).await
}
SendDm { recipient_id, ciphertext, nonce } => {
handlers::dms::handle_send_dm(session, state, recipient_id, ciphertext, nonce).await
}
FetchDmHistory { peer_id, before_message_id, limit } => {
handlers::dms::handle_dm_history(session, state, peer_id, before_message_id, limit).await
}
CreateRole { name, permissions } => {
handlers::roles::handle_create(session, state, name, permissions).await
}
AssignRole { user_id, role_id } => {
handlers::roles::handle_assign(session, state, user_id, role_id).await
}
RevokeRole { user_id, role_id } => {
handlers::roles::handle_revoke(session, state, user_id, role_id).await
}
QueryIdentityKey { user_id } => {
handlers::roles::handle_query_key(session, state, user_id).await
}
Ping => session.send(ServerMessage::Pong),
}
}

View File

@@ -0,0 +1,45 @@
use backchannel_common::protocol::ServerMessage;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use uuid::Uuid;
use crate::error::{Result, ServerError};
use crate::state::WsSender;
/// Per-connection mutable state, owned exclusively by the connection's read task.
pub struct Session {
pub conn_id: Uuid,
pub user_id: Option<Uuid>,
pub username: Option<String>,
/// JWT ID stored for revocation on logout.
pub jti: Option<String>,
/// Sender to the write task for this connection.
pub tx: WsSender,
}
impl Session {
pub fn new(tx: WsSender) -> Self {
Self {
conn_id: Uuid::new_v4(),
user_id: None,
username: None,
jti: None,
tx,
}
}
/// Serialize and send a `ServerMessage` to this connection's write task.
pub fn send(&self, msg: ServerMessage) -> Result<()> {
let json = serde_json::to_string(&msg)?;
self.tx
.send(WsMessage::Text(json))
.map_err(|_| ServerError::ConnectionClosed)
}
pub fn require_auth(&self) -> Result<Uuid> {
self.user_id.ok_or(ServerError::Unauthorized)
}
pub fn require_username(&self) -> Result<&str> {
self.username.as_deref().ok_or(ServerError::Unauthorized)
}
}