1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::cmp;
use std::io::SeekFrom;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures::ready;
use futures::AsyncRead;
use futures::AsyncSeek;
use crate::raw::*;
use crate::*;
pub fn from_fd<R>(fd: R, start: u64, end: u64) -> FdReader<R>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
{
FdReader {
inner: fd,
start,
end,
offset: 0,
}
}
pub struct FdReader<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> {
inner: R,
start: u64,
end: u64,
offset: u64,
}
impl<R> FdReader<R>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
{
pub(crate) fn current_size(&self) -> i64 {
debug_assert!(self.offset >= self.start, "offset must in range");
self.end as i64 - self.offset as i64
}
}
impl<R> oio::Read for FdReader<R>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
if self.current_size() <= 0 {
return Poll::Ready(Ok(0));
}
let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize;
let n =
ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max])).map_err(|err| {
Error::new(ErrorKind::Unexpected, "read data from FdReader")
.with_context("source", "FdReader")
.set_source(err)
})?;
self.offset += n as u64;
Poll::Ready(Ok(n))
}
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
let (base, offset) = match pos {
SeekFrom::Start(n) => (self.start as i64, n as i64),
SeekFrom::End(n) => (self.end as i64, n),
SeekFrom::Current(n) => (self.offset as i64, n),
};
match base.checked_add(offset) {
Some(n) if n < 0 => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"invalid seek to a negative or overflowing position",
))),
Some(n) => {
let cur =
ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(n as u64)))
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "seek data from FdReader")
.with_context("source", "FdReader")
.set_source(err)
})?;
self.offset = cur;
Poll::Ready(Ok(self.offset - self.start))
}
None => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"invalid seek to a negative or overflowing position",
))),
}
}
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<bytes::Bytes>>> {
let _ = cx;
Poll::Ready(Some(Err(Error::new(
ErrorKind::Unsupported,
"output reader doesn't support seeking",
))))
}
}