WebSocket
Middleware that provides support for WebSocket
.
Example
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
struct User {
id: usize,
name: String,
}
#[handler]
async fn connect(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
let user = req.parse_queries::<User>();
WebSocketUpgrade::new()
.upgrade(req, res, |mut ws| async move {
println!("{:#?} ", user);
while let Some(msg) = ws.recv().await {
let msg = if let Ok(msg) = msg {
msg
} else {
// client disconnected
return;
};
if ws.send(msg).await.is_err() {
// client disconnected
return;
}
}
})
.await
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(INDEX_HTML));
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let router = Router::new()
.get(index)
.push(Router::with_path("ws").goal(connect));
let acceptor = TcpListener::new("127.0.0.1:5800").bind().await;
Server::new(acceptor).serve(router).await;
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
<head>
<title>WS</title>
</head>
<body>
<h1>WS</h1>
<div id="status">
<p><em>Connecting...</em></p>
</div>
<script>
const status = document.getElementById('status');
const msg = document.getElementById('msg');
const submit = document.getElementById('submit');
const ws = new WebSocket(`ws://${location.host}/ws?id=123&name=dddf`);
ws.onopen = function() {
status.innerHTML = '<p><em>Connected!</em></p>';
};
</script>
</body>
</html>
"#;
[package]
name = "example-websocket"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
futures-util = { version = "0.3", default-features = false }
salvo = { workspace = true, features = ["websocket"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
serde = "1"
serde_json = "1"
WebSocket Chat
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//
// port from https://github.com/seanmonstar/warp/blob/master/examples/websocket_chat.rs
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_util::{FutureExt, StreamExt};
use once_cell::sync::Lazy;
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use salvo::prelude::*;
use salvo::websocket::{Message, WebSocket, WebSocketUpgrade};
type Users = RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, salvo::Error>>>>;
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: Lazy<Users> = Lazy::new(Users::default);
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let router = Router::new()
.goal(index)
.push(Router::with_path("chat").goal(user_connected));
let acceptor = TcpListener::new("127.0.0.1:5800").bind().await;
Server::new(acceptor).serve(router).await;
}
#[handler]
async fn user_connected(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, handle_socket)
.await
}
async fn handle_socket(ws: WebSocket) {
// Use a counter to assign a new unique ID for this user.
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
tracing::info!("new chat user: {}", my_id);
// Split the socket into a sender and receive of messages.
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
let fut = rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
tracing::error!(error = ?e, "websocket send error");
}
});
tokio::task::spawn(fut);
let fut = async move {
ONLINE_USERS.write().await.insert(my_id, tx);
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("websocket error(uid={}): {}", my_id, e);
break;
}
};
user_message(my_id, msg).await;
}
user_disconnected(my_id).await;
};
tokio::task::spawn(fut);
}
async fn user_message(my_id: usize, msg: Message) {
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg);
// New message from this user, send it to everyone else (except same uid)...
for (&uid, tx) in ONLINE_USERS.read().await.iter() {
if my_id != uid {
if let Err(_disconnected) = tx.send(Ok(Message::text(new_msg.clone()))) {
// The tx is disconnected, our `user_disconnected` code
// should be happening in another task, nothing more to
// do here.
}
}
}
}
async fn user_disconnected(my_id: usize) {
eprintln!("good bye user: {}", my_id);
// Stream closed up, so remove from the user list
ONLINE_USERS.write().await.remove(&my_id);
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(INDEX_HTML));
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
<head>
<title>WS Chat</title>
</head>
<body>
<h1>WS Chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="submit">Submit</button>
<script>
const chat = document.getElementById('chat');
const msg = document.getElementById('msg');
const submit = document.getElementById('submit');
const ws = new WebSocket(`ws://${location.host}/chat`);
ws.onopen = function() {
chat.innerHTML = '<p><em>Connected!</em></p>';
};
ws.onmessage = function(msg) {
showMessage(msg.data);
};
ws.onclose = function() {
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
};
submit.onclick = function() {
const msg = text.value;
ws.send(msg);
text.value = '';
showMessage('<You>: ' + msg);
};
function showMessage(data) {
const line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
</script>
</body>
</html>
"#;
[package]
name = "example-websocket-chat"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
futures-util = { version = "0.3", default-features = false }
once_cell = "1"
salvo = {workspace = true, features = ["websocket"] }
tokio = { version = "1", features = ["macros"] }
tokio-stream = { version = "0.1", features = ["net"] }
tracing = "0.1"
tracing-subscriber = "0.3"