cfun/
c_lfmap.rs

1#[cfg(feature = "tokio")]
2use std::{collections::HashMap, fmt::Debug};
3
4#[cfg(feature = "tokio")]
5use tokio::sync::{
6    mpsc::{unbounded_channel, UnboundedSender},
7    oneshot,
8};
9
10#[derive(Debug)]
11#[cfg(feature = "tokio")]
12enum MapOpt<K, V>
13where
14    K: 'static + Eq + std::hash::Hash + Clone + Send,
15    V: 'static + Clone + Send + Debug,
16{
17    Insert(oneshot::Sender<Option<V>>, K, V),
18    Get(oneshot::Sender<Option<V>>, K),
19    Remove(oneshot::Sender<Option<V>>, K),
20}
21
22/// Lock-Free Map
23#[derive(Debug)]
24#[cfg(feature = "tokio")]
25pub struct LFMap<K, V>
26where
27    K: 'static + Eq + std::hash::Hash + Clone + Send,
28    V: 'static + Clone + Send + Debug,
29{
30    opt_sender: UnboundedSender<MapOpt<K, V>>,
31}
32
33#[cfg(feature = "tokio")]
34impl<K, V> LFMap<K, V>
35where
36    K: 'static + Eq + std::hash::Hash + Clone + Send,
37    V: 'static + Clone + Send + Debug,
38{
39    /// create new CtlConns
40    pub fn new() -> Self {
41        let (opt_sender, mut opt_receiver) = unbounded_channel::<MapOpt<K, V>>();
42        tokio::spawn(async move {
43            let mut tcp_pool: HashMap<K, V> = HashMap::new();
44            while let Some(opt) = opt_receiver.recv().await {
45                match opt {
46                    MapOpt::Insert(sender, key, value) => {
47                        let ret = tcp_pool.insert(key, value);
48                        sender.send(ret).unwrap();
49                    }
50                    MapOpt::Remove(sender, port) => {
51                        let ret = tcp_pool.remove(&port);
52                        sender.send(ret).unwrap();
53                    }
54                    MapOpt::Get(sender, key) => {
55                        let tcp_stream = tcp_pool.get(&key);
56                        if let Some(links) = tcp_stream {
57                            sender.send(Some(links.clone())).unwrap();
58                        } else {
59                            sender.send(None).unwrap();
60                        }
61                    }
62                }
63            }
64        });
65        Self { opt_sender }
66    }
67
68    /// insert new value
69    pub async fn insert(&self, key: K, value: V) -> Option<V> {
70        let (sender, receiver) = oneshot::channel();
71        self.opt_sender
72            .send(MapOpt::Insert(sender, key, value))
73            .unwrap();
74        let data = receiver.await.unwrap();
75        data
76    }
77
78    /// insert new value
79    pub async fn get(&self, key: K) -> Option<V> {
80        let (sender, receiver) = oneshot::channel();
81        self.opt_sender.send(MapOpt::Get(sender, key)).unwrap();
82        let data = receiver.await.unwrap();
83        data
84    }
85
86    /// remove key
87    pub async fn remove(&self, key: K) -> Option<V> {
88        let (sender, receiver) = oneshot::channel();
89        self.opt_sender.send(MapOpt::Remove(sender, key)).unwrap();
90        let data = receiver.await.unwrap();
91        data
92    }
93}
94
95#[cfg(feature = "tokio")]
96#[tokio::test]
97async fn test_lfmap() {
98    let map = LFMap::new();
99    map.insert("key1".to_string(), "value1".to_string()).await;
100    let value = map.get("key1".to_string()).await;
101    assert_eq!(value, Some("value1".to_string()));
102    let value = map.remove("key1".to_string()).await;
103    assert_eq!(value, Some("value1".to_string()));
104}