feat: support gracefully shutdown server

This commit is contained in:
sigoden
2022-06-03 10:59:54 +08:00
parent 4167e5c07e
commit c3dd0f0ec5
3 changed files with 73 additions and 44 deletions

View File

@@ -53,52 +53,65 @@ macro_rules! status {
}
pub async fn serve(args: Args) -> BoxResult<()> {
match args.tls.as_ref() {
Some(_) => serve_https(args).await,
None => serve_http(args).await,
}
}
pub async fn serve_https(args: Args) -> BoxResult<()> {
let args = Arc::new(args);
let socket_addr = args.address()?;
let (certs, key) = args.tls.clone().unwrap();
let inner = Arc::new(InnerService::new(args.clone()));
let config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, key)?;
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let arc_acceptor = Arc::new(tls_acceptor);
let listener = TcpListener::bind(&socket_addr).await?;
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
let incoming = hyper::server::accept::from_stream(incoming.filter_map(|socket| async {
match socket {
Ok(stream) => match arc_acceptor.clone().accept(stream).await {
Ok(val) => Some(Ok::<_, Infallible>(val)),
Err(_) => None,
},
Err(_) => None,
}
}));
let server = hyper::Server::builder(incoming).serve(make_service_fn(move |_| {
let inner = inner.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let inner = inner.clone();
inner.call(req)
}))
}
}));
print_listening(args.address.as_str(), args.port, true);
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await?;
Ok(())
}
pub async fn serve_http(args: Args) -> BoxResult<()> {
let args = Arc::new(args);
let socket_addr = args.address()?;
let inner = Arc::new(InnerService::new(args.clone()));
if let Some((certs, key)) = args.tls.as_ref() {
let config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs.clone(), key.clone())?;
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let arc_acceptor = Arc::new(tls_acceptor);
let listener = TcpListener::bind(&socket_addr).await?;
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
let incoming = hyper::server::accept::from_stream(incoming.filter_map(|socket| async {
match socket {
Ok(stream) => match arc_acceptor.clone().accept(stream).await {
Ok(val) => Some(Ok::<_, Infallible>(val)),
Err(_) => None,
},
Err(_) => None,
}
}));
let server = hyper::Server::builder(incoming).serve(make_service_fn(move |_| {
let inner = inner.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let inner = inner.clone();
inner.call(req)
}))
}
}));
print_listening(args.address.as_str(), args.port, true);
server.await?;
} else {
let server = hyper::Server::try_bind(&socket_addr)?.serve(make_service_fn(move |_| {
let inner = inner.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let inner = inner.clone();
inner.call(req)
}))
}
}));
print_listening(args.address.as_str(), args.port, false);
server.await?;
}
let server = hyper::Server::try_bind(&socket_addr)?.serve(make_service_fn(move |_| {
let inner = inner.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let inner = inner.clone();
inner.call(req)
}))
}
}));
print_listening(args.address.as_str(), args.port, false);
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await?;
Ok(())
}
@@ -751,3 +764,9 @@ fn retrive_listening_addrs(address: &str) -> Vec<String> {
}
vec![address.to_owned()]
}
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("Failed to install CTRL+C signal handler")
}