每次使用 Docker 中止写的代码都会超过宽限时间被强行中止,这并不是一件好事,因此需要添加处理关闭的逻辑。在使用时发现 sse 导致 axum
无法退出,因此记录一下
当使用命令 docker stop xxx
时,Docker 会向容器里应用的进程发送 SIGTERM
信号,因此需要监听该信号。
这里使用 tokio_util::sync::CancellationToken
来传递即将停止的消息,在接收到 SIGTERM
信号后就调用 CancellationToken
的 cancel
方法
#创建 axum
服务
let shutdown_signal = CancellationToken::new();
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
tokio::spawn(async move {
sigterm.recv().await;
shutdown_signal.cancel();
})
shutdown_signal
需要储存在 axum
的 state
中,axum
的 with_graceful_shutdown
也需要使用到它
tokio::spawn(async move{
let (tx, _) = tokio::sync::broadcast::channel(8);
let state = AppState{
shutdown_signal: shutdown_signal.clone(),
broadcast: tx // sse 的广播通道也记录于此
};
let routes = /**/
axum::serve(
listener,
routes.with_state(state)
)
.with_graceful_shutdown(async move {
shutdown_signal.cancelled().await;
})
.await
})
#SSE 处理
存在 sse 会导致 axum
无法关闭,具体可查看 Graceful shutdown never stops the server when there is an open sse connection,因此需要监听 shutdown_signal
。前面将 shutdown_signal
记录在了 axum
的 state
中,读取还是很方便。
use axum::{
extract::State,
response::{sse, Sse},
BoxError,
};
use serde_json::json;
use tokio_stream::wrappers::BroadcastStream;
use futures::stream;
use tokio_stream::StreamExt;
pub async sse(
State(state): State<AppState>
) -> Sse<impl tokio_stream::Stream<Item = Result<sse::Event, BoxError>>>{
// 这是 SSE 的事件广播
let receiver = state.broadcast.subscribe();
// 将广播转化为 stream
let notify_stream = BroadcastStream::new(receiver).map(|it| -> Result<sse::Event, BoxError> {
match it {
Ok(payload) => Ok(sse::Event::default().data(payload.to_json())),
Err(err) => {
tracing::error!(reason = ?err, "failed to read broadcast message.");
Err(Box::new(err))
}
}
},
);
// 创建一个心跳 stream
let heart_stream = stream::repeat_with(|| {
let now = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
sse::Event::default().data(
json!({
"type": "HEART",
"time": now.as_millis()
})
.to_string(),
)
})
.map(|it| -> Result<sse::Event, BoxError> { Ok(it) })
.throttle(Duration::from_secs(1));
// 组合两个 stream
let combined_stream = stream::select(notify_stream, heart_stream);
// 将其包装成可中止的 stream
let (combined_stream, handler) = stream::abortable(combined_stream);
// 当 shutdown_signal 取消时,中止 stream
tokio::spawn(async move {
shutdown_signal.cancelled().await;
handler.abort()
});
// 返回
Sse::new(combined_stream).keep_alive(
sse::KeepAlive::new()
.interval(Duration::from_secs(1))
.text("keep"),
)
}
sse
关闭后,axum
便能正常停止了