1#[cfg(feature = "tokio")]
2use futures::{
3 stream::{SplitSink, SplitStream},
4 StreamExt,
5};
6#[cfg(feature = "tokio")]
7use tokio::net::TcpStream;
8#[cfg(feature = "tokio")]
9use tokio_util::codec::{AnyDelimiterCodec, Framed};
10
11#[cfg(feature = "tokio")]
12pub type FrameSender<T> = SplitSink<Framed<TcpStream, AnyDelimiterCodec>, T>;
13#[cfg(feature = "tokio")]
14pub type FrameReceiver = SplitStream<Framed<TcpStream, AnyDelimiterCodec>>;
15
16#[cfg(feature = "tokio")]
18pub fn frame_stream_split<T>(stream: TcpStream, split: Vec<u8>) -> (FrameSender<T>, FrameReceiver)
19where
20 T: AsRef<str>,
21{
22 let codec = AnyDelimiterCodec::new(split.clone(), split);
23 let frame = Framed::new(stream, codec);
24 frame.split::<T>()
25}
26
27#[cfg(feature = "tokio")]
29pub fn frame_stream_combined<T>(sender: FrameSender<T>, receiver: FrameReceiver) -> TcpStream
30where
31 T: AsRef<str>,
32{
33 sender.reunite(receiver).unwrap().into_parts().io
34}
35
36#[cfg(feature = "tokio")]
38pub async fn proxy_tcpstream(
39 mut stream1: TcpStream,
40 mut stream2: TcpStream,
41) -> Result<(u64, u64), std::io::Error> {
42 use tokio::io::copy_bidirectional;
43
44 let (s1, s2) = copy_bidirectional(&mut stream1, &mut stream2).await?;
45 Ok((s1, s2))
46}
47
48#[tokio::test]
49#[cfg(feature = "tokio")]
50async fn test_tokio() {
51 let stream = tokio::net::TcpStream::connect("127.0.0.1:8000")
52 .await
53 .unwrap();
54 let (sender, receiver) = frame_stream_split::<String>(stream, b"\0".to_vec());
55 let _stream = frame_stream_combined(sender, receiver);
56}