cfun/
c_tokio.rs

1#[cfg(feature = "tokio")]
2use futures::{
3    stream::{SplitSink, SplitStream},
4    StreamExt,
5};
6#[cfg(feature = "tokio")]
7use tokio::net::TcpStream;
8#[cfg(feature = "tokio")]
9use tokio_util::codec::{AnyDelimiterCodec, Framed};
10
11#[cfg(feature = "tokio")]
12pub type FrameSender<T> = SplitSink<Framed<TcpStream, AnyDelimiterCodec>, T>;
13#[cfg(feature = "tokio")]
14pub type FrameReceiver = SplitStream<Framed<TcpStream, AnyDelimiterCodec>>;
15
16/// split a Tcpstream to sender/receiver frame stream.
17#[cfg(feature = "tokio")]
18pub fn frame_stream_split<T>(stream: TcpStream, split: Vec<u8>) -> (FrameSender<T>, FrameReceiver)
19where
20    T: AsRef<str>,
21{
22    let codec = AnyDelimiterCodec::new(split.clone(), split);
23    let frame = Framed::new(stream, codec);
24    frame.split::<T>()
25}
26
27/// combined sender/receiver frame stream to TcoStream.
28#[cfg(feature = "tokio")]
29pub fn frame_stream_combined<T>(sender: FrameSender<T>, receiver: FrameReceiver) -> TcpStream
30where
31    T: AsRef<str>,
32{
33    sender.reunite(receiver).unwrap().into_parts().io
34}
35
36/// Copy data mutually between two Tcpstreams.
37#[cfg(feature = "tokio")]
38pub async fn proxy_tcpstream(
39    mut stream1: TcpStream,
40    mut stream2: TcpStream,
41) -> Result<(u64, u64), std::io::Error> {
42    use tokio::io::copy_bidirectional;
43
44    let (s1, s2) = copy_bidirectional(&mut stream1, &mut stream2).await?;
45    Ok((s1, s2))
46}
47
48#[tokio::test]
49#[cfg(feature = "tokio")]
50async fn test_tokio() {
51    let stream = tokio::net::TcpStream::connect("127.0.0.1:8000")
52        .await
53        .unwrap();
54    let (sender, receiver) = frame_stream_split::<String>(stream, b"\0".to_vec());
55    let _stream = frame_stream_combined(sender, receiver);
56}