mirror of
https://github.com/sigoden/dufs.git
synced 2026-04-09 00:59:02 +03:00
feat: supports resumable uploads (#343)
This commit is contained in:
107
src/server.rs
107
src/server.rs
@@ -42,6 +42,7 @@ use std::time::SystemTime;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite};
|
||||
use tokio::{fs, io};
|
||||
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
use tokio_util::io::{ReaderStream, StreamReader};
|
||||
use uuid::Uuid;
|
||||
@@ -57,7 +58,8 @@ const INDEX_JS: &str = include_str!("../assets/index.js");
|
||||
const FAVICON_ICO: &[u8] = include_bytes!("../assets/favicon.ico");
|
||||
const INDEX_NAME: &str = "index.html";
|
||||
const BUF_SIZE: usize = 65536;
|
||||
const TEXT_MAX_SIZE: u64 = 4194304; // 4M
|
||||
const EDITABLE_TEXT_MAX_SIZE: u64 = 4194304; // 4M
|
||||
const RESUMABLE_UPLOAD_MIN_SIZE: u64 = 20971520; // 20M
|
||||
|
||||
pub struct Server {
|
||||
args: Args,
|
||||
@@ -327,10 +329,37 @@ impl Server {
|
||||
set_webdav_headers(&mut res);
|
||||
}
|
||||
Method::PUT => {
|
||||
if !allow_upload || (!allow_delete && is_file && size > 0) {
|
||||
if is_dir || !allow_upload || (!allow_delete && size > 0) {
|
||||
status_forbid(&mut res);
|
||||
} else {
|
||||
self.handle_upload(path, req, &mut res).await?;
|
||||
self.handle_upload(path, None, size, req, &mut res).await?;
|
||||
}
|
||||
}
|
||||
Method::PATCH => {
|
||||
if is_miss {
|
||||
status_not_found(&mut res);
|
||||
} else if !allow_upload {
|
||||
status_forbid(&mut res);
|
||||
} else {
|
||||
let offset = match parse_upload_offset(headers, size) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
status_bad_request(&mut res, &err.to_string());
|
||||
return Ok(res);
|
||||
}
|
||||
};
|
||||
match offset {
|
||||
Some(offset) => {
|
||||
if offset < size && !allow_delete {
|
||||
status_forbid(&mut res);
|
||||
}
|
||||
self.handle_upload(path, Some(offset), size, req, &mut res)
|
||||
.await?;
|
||||
}
|
||||
None => {
|
||||
*res.status_mut() = StatusCode::METHOD_NOT_ALLOWED;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Method::DELETE => {
|
||||
@@ -417,17 +446,27 @@ impl Server {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn handle_upload(&self, path: &Path, req: Request, res: &mut Response) -> Result<()> {
|
||||
async fn handle_upload(
|
||||
&self,
|
||||
path: &Path,
|
||||
upload_offset: Option<u64>,
|
||||
size: u64,
|
||||
req: Request,
|
||||
res: &mut Response,
|
||||
) -> Result<()> {
|
||||
ensure_path_parent(path).await?;
|
||||
|
||||
let mut file = match fs::File::create(&path).await {
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
status_forbid(res);
|
||||
return Ok(());
|
||||
let (mut file, status) = match upload_offset {
|
||||
None => (fs::File::create(path).await?, StatusCode::CREATED),
|
||||
Some(offset) if offset == size => (
|
||||
fs::OpenOptions::new().append(true).open(path).await?,
|
||||
StatusCode::NO_CONTENT,
|
||||
),
|
||||
Some(offset) => {
|
||||
let mut file = fs::OpenOptions::new().write(true).open(path).await?;
|
||||
file.seek(SeekFrom::Start(offset)).await?;
|
||||
(file, StatusCode::NO_CONTENT)
|
||||
}
|
||||
};
|
||||
|
||||
let stream = IncomingStream::new(req.into_body());
|
||||
|
||||
let body_with_io_error = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
|
||||
@@ -436,13 +475,19 @@ impl Server {
|
||||
pin_mut!(body_reader);
|
||||
|
||||
let ret = io::copy(&mut body_reader, &mut file).await;
|
||||
let size = fs::metadata(path)
|
||||
.await
|
||||
.map(|v| v.len())
|
||||
.unwrap_or_default();
|
||||
if ret.is_err() {
|
||||
tokio::fs::remove_file(&path).await?;
|
||||
|
||||
if upload_offset.is_none() && size < RESUMABLE_UPLOAD_MIN_SIZE {
|
||||
let _ = tokio::fs::remove_file(&path).await;
|
||||
}
|
||||
ret?;
|
||||
}
|
||||
|
||||
*res.status_mut() = StatusCode::CREATED;
|
||||
*res.status_mut() = status;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -830,7 +875,8 @@ impl Server {
|
||||
);
|
||||
let mut buffer: Vec<u8> = vec![];
|
||||
file.take(1024).read_to_end(&mut buffer).await?;
|
||||
let editable = meta.len() <= TEXT_MAX_SIZE && content_inspector::inspect(&buffer).is_text();
|
||||
let editable =
|
||||
meta.len() <= EDITABLE_TEXT_MAX_SIZE && content_inspector::inspect(&buffer).is_text();
|
||||
let data = EditData {
|
||||
href,
|
||||
kind,
|
||||
@@ -867,7 +913,7 @@ impl Server {
|
||||
Some(v) => match v.to_str().ok().and_then(|v| v.parse().ok()) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
*res.status_mut() = StatusCode::BAD_REQUEST;
|
||||
status_bad_request(res, "");
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
@@ -1545,6 +1591,13 @@ fn status_no_content(res: &mut Response) {
|
||||
*res.status_mut() = StatusCode::NO_CONTENT;
|
||||
}
|
||||
|
||||
fn status_bad_request(res: &mut Response, body: &str) {
|
||||
*res.status_mut() = StatusCode::BAD_REQUEST;
|
||||
if !body.is_empty() {
|
||||
*res.body_mut() = body_full(body.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
fn set_content_diposition(res: &mut Response, inline: bool, filename: &str) -> Result<()> {
|
||||
let kind = if inline { "inline" } else { "attachment" };
|
||||
let filename: String = filename
|
||||
@@ -1584,10 +1637,12 @@ fn is_hidden(hidden: &[String], file_name: &str, is_dir_type: bool) -> bool {
|
||||
fn set_webdav_headers(res: &mut Response) {
|
||||
res.headers_mut().insert(
|
||||
"Allow",
|
||||
HeaderValue::from_static("GET,HEAD,PUT,OPTIONS,DELETE,PROPFIND,COPY,MOVE"),
|
||||
HeaderValue::from_static("GET,HEAD,PUT,OPTIONS,DELETE,PATCH,PROPFIND,COPY,MOVE"),
|
||||
);
|
||||
res.headers_mut().insert(
|
||||
"DAV",
|
||||
HeaderValue::from_static("1, 2, 3, sabredav-partialupdate"),
|
||||
);
|
||||
res.headers_mut()
|
||||
.insert("DAV", HeaderValue::from_static("1,2"));
|
||||
}
|
||||
|
||||
async fn get_content_type(path: &Path) -> Result<String> {
|
||||
@@ -1620,3 +1675,17 @@ async fn get_content_type(path: &Path) -> Result<String> {
|
||||
};
|
||||
Ok(content_type)
|
||||
}
|
||||
|
||||
fn parse_upload_offset(headers: &HeaderMap<HeaderValue>, size: u64) -> Result<Option<u64>> {
|
||||
let value = match headers.get("x-update-range") {
|
||||
Some(v) => v,
|
||||
None => return Ok(None),
|
||||
};
|
||||
let err = || anyhow!("Invalid X-Updage-Range header");
|
||||
let value = value.to_str().map_err(|_| err())?;
|
||||
if value == "append" {
|
||||
return Ok(Some(size));
|
||||
}
|
||||
let (start, _) = parse_range(value, size).ok_or_else(err)?;
|
||||
Ok(Some(start))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user