基于通讯模型
通讯模型
双向通讯
笔记:前面TCP 服务器的例子里,所做的都是双向通讯。这是最典型的一种通讯方式:
- 服务端监听9527端口
- 客户端连接9527端口
- 一旦连接建立,服务器和客户端都可以根据需要主动向对方发起传输。
- 服务器收到消息后就spawn一个线程处理
- 整个网络运行在全双工模式下(full duplex)
我们熟悉的 TCP / WebSocket 就运行在这种模型下。
双向通讯的好处是:数据的流向是没有限制的,一端不必等待另一端才能发送数据,网络可以进行比较实时地处理。
请求响应
笔记:在 Web 开发的世界里,请求 - 响应模型是我们最熟悉的模型。
客户端发送请求,服务器根据请求返回响应。
整个网络处在半双工模式下(half duplex)。HTTP/1.x 就运行在这种模式下。
- 一般而言,请求响应模式下,在客户端没有发起请求时,服务器不会也无法主动向客户端发送数据。
- 除此之外,请求发送的顺序和响应返回的顺序是一一对应的,不会也不能乱序,这种处理方式会导致**应用层的队头阻塞(Head-Of-Line blocking)**。
请求响应模型处理起来很简单,由于 HTTP 协议的流行,尽管有很多限制,请求响应模型还是得到了非常广泛的应用。
笔记:基于请求-响应 和 双向通讯的组合模型
基础模型只有
请求-响应模型
和双向通讯模型
两个, 其他的组合模型是在这两个基础模型的基础上进行了不同方式的组合和扩展。
一、基础模型
- 请求-响应模型
- 双向通讯模型
二、组合模型
-
请求-回调模型 1.1 客户端发送请求,服务器异步处理请求并回调客户端。 1.2 整个网络处于异步通信状态。 1.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-流式处理模型 2.1 客户端发送请求,服务器以流的形式异步处理请求。 2.2 整个网络处于异步通信状态。 2.3 Rust相关开源项目:tokio、async-std等。
-
请求-队列模型 3.1 客户端发送请求,服务器将请求放入队列中异步处理。 3.2 整个网络处于异步通信状态。 3.3 Rust相关开源项目:RabbitMQ、Kafka等。
-
请求-推送模型 4.1 客户端发送请求,服务器将处理结果主动推送给客户端。 4.2 整个网络处于异步通信状态。 4.3 Rust相关开源项目:actix-web、Rocket等。
-
异步通知模型 5.1 客户端发送请求,服务器在之后通过Webhooks等技术进行异步通知。 5.2 整个网络处于异步通信状态。 5.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-响应-异步通知模型 6.1 客户端发送请求,服务器根据请求返回响应,并在之后异步通知客户端相关数据更新。 6.2 整个网络处于异步通信状态。 6.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-流式处理-异步通知模型 7.1 客户端发送请求,服务器将请求以流的形式异步处理,并在之后异步通知客户端相关数据更新。 7.2 整个网络处于异步通信状态。 7.3 Rust相关开源项目:tokio、async-std等。
-
请求-队列-回调模型 8.1 客户端发送请求,服务器将请求放入队列中异步处理,并在完成后回调客户端。 8.2 整个网络处于异步通信状态。 8.3 Rust相关开源项目:RabbitMQ、Kafka等。
-
请求-队列-推送模型 9.1 客户端发送请求,服务器将请求放入队列中异步处理,并将处理结果主动推送给客户端。 9.2 整个网络处于异步通信状态。 9.3 Rust相关开源项目:RabbitMQ、Kafka等。
-
请求-回调-推送模型 10.1 客户端发送请求,服务器异步处理请求并回调客户端,同时将处理结果主动推送给客户端。 10.2 整个网络处于异步通信状态。 10.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-响应-流式处理模型 11.1 客户端发送请求,服务器根据请求返回响应,并以流的形式异步处理相关数据并返回给客户端。 11.2 整个网络处于异步通信状态。 11.3 Rust相关开源项目:tokio、async-std等。
-
请求-响应-队列模型 12.1 客户端发送请求,服务器根据请求返回响应,并将请求放入队列中异步处理。 12.2 整个网络处于异步通信状态。 12.3 Rust相关开源项目:RabbitMQ、Kafka等。
-
请求-响应-推送模型 13.1 客户端发送请求,服务器根据请求返回响应,并将处理结果主动推送给客户端。 13.2 整个网络处于异步通信状态。 13.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-响应-异步通知模型 14.1 客户端发送请求,服务器根据请求返回响应,并在之后通过Webhooks等技术进行异步通知。 14.2 整个网络处于异步通信状态。 14.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-流式处理-队列模型 15.1 客户端发送请求,服务器以流的形式异步处理请求,并将请求放入队列中异步处理相关数据并返回给客户端。 15.2 整个网络处于异步通信状态。 15.3 Rust相关开源项目:tokio、async-std、RabbitMQ、Kafka等。
-
请求-流式处理-推送模型 16.1 客户端发送请求,服务器以流的形式异步处理请求,并将处理结果主动推送给客户端。 16.2 整个网络处于异步通信状态。 16.3 Rust相关开源项目:tokio、async-std、actix-web、Rocket等。
-
请求-队列-回调-推送模型 17.1 客户端发送请求,服务器将请求放入队列中异步处理,并在完成后回调客户端并将处理结果主动推送给客户端。 17.2 整个网络处于异步通信状态。 17.3 Rust相关开源项目:RabbitMQ、Kafka、actix-web、Rocket等。
-
请求-回调-异步通知模型 18.1 客户端发送请求,服务器异步处理请求并回调客户端,在之后通过Webhooks等技术进行异步通知。 18.2 整个网络处于异步通信状态。 18.3 Rust相关开源项目:actix-web、Rocket等。
-
请求-响应-流式处理-异步通知模型 19.1 客户端发送请求,服务器根据请求返回响应,并以流的形式异步处理相关数据并返回给客户端,在之后通过Webhooks等技术进行异步通知。 19.2 整个网络处于异步通信状态。 19.3 Rust相关开源项目:tokio、async-std、actix-web、Rocket等。
-
请求-队列-流式处理模型 20.1 客户端发送请求,服务器将请求放入队列中异步处理,并以流的形式异步处理相关数据并返回给客户端。 20.2 整个网络处于异步通信状态。 20.3 Rust相关开源项目:RabbitMQ、Kafka、tokio、async-std等。
-
请求-队列-推送-异步通知模型 21.1 客户端发送请求,服务器将请求放入队列中异步处理,并将处理结果主动推送给客户端,在之后通过Webhooks等技术进行异步通知。 21.2 整个网络处于异步通信状态。 21.3 Rust相关开源项目:RabbitMQ、Kafka、actix-web、Rocket等。
-
请求-回调-推送-异步通知模型 22.1 客户端发送请求,服务器异步处理请求并回调客户端,同时将处理结果主动推送给客户端,在之后通过Webhooks等技术进行异步通知。 22.2 整个网络处于异步通信状态。 22.3 Rust相关开源项目:actix-web、Rocket等。
控制平面 / 数据平面分离
但有时候,服务器和客户端之间会进行复杂的通讯,这些通讯包含控制信令和数据流。
因为 TCP 有天然的网络层的队头阻塞,所以当控制信令和数据交杂在同一个连接中时,过大的数据流会阻塞控制信令,使其延迟加大,无法及时响应一些重要的命令。
以 FTP 为例:
- 如果用户在传输一个 1G 的文件后,再进行 ls 命令
- 如果文件传输和 ls 命令都在同一个连接中进行,那么,只有文件传输结束,用户才会看到 ls 命令的结果,这样显然对用户非常不友好。
所以,我们会采用控制平面和数据平面分离的方式,进行网络处理:
所以,我们会采用控制平面和数据平面分离的方式,进行网络处理:
-
connect()/ctrl_send/ctrl_recv:
客户端会首先连接服务器,建立控制连接。控制连接是一个长连接,会一直存在,直到交互终止。
-
connect/data_send/data_recv:
然后,二者会根据需要额外创建新的临时的数据连接,用于传输大容量的数据,数据连接在完成相应的工作后,会自动关闭。
除 FTP 外,还有很多协议都是类似的处理方式:
-
比如**多媒体通讯协议SIP 协议**。
-
HTTP/2 和借鉴了 HTTP/2 的用于多路复用的 Yamux 协议,虽然运行在同一个 TCP 连接之上,它们在应用层也构建了类似的控制平面和数据平面。
以 HTTP/2 为例
-
控制平面(ctrl stream)可以创建很多新的 stream,用于并行处理多个应用层的请求
-
比如使用 HTTP/2 的 gRPC,各个请求可以并行处理
-
不同 stream 之间的数据可以乱序返回,而不必受请求响应模型的限制
虽然 HTTP/2 依旧受困于 TCP 层的队头阻塞,但它解决了应用层的队头阻塞。
P2P 网络
前面我们谈论的网络通讯模型,都是传统的客户端 / 服务器交互模型(C/S 或 B/S):
客户端和服务器在网络中的作用是不对等的,客户端永远是连接的发起方,而服务器是连接的处理方。
不对等的网络模型有很多好处
比如客户端不需要公网地址,可以隐藏在网络地址转换(NAT)设备(比如 NAT 网关、防火墙)之后,只要服务器拥有公网地址,这个网络就可以连通。
所以,客户端 / 服务器模型是天然中心化的,所有连接都需要经过服务器这个中间人,即便是两个客户端的数据交互也不例外。
这种模型随着互联网的大规模使用成为了网络世界的主流。
然而,很多应用场景需要通讯的两端可以直接交互,而无需一个中间人代为中转。
比如 A 和 B 分享一个 1G 的文件,如果通过服务器中转,数据相当于传输了两次,效率很低。
P2P 模型打破了这种不对等的关系,使得任意两个节点在理论上可以直接连接,每个节点既是客户端,又是服务器。
如何构建 P2P 网络
可是由于历史上 IPv4 地址的缺乏,以及对隐私和网络安全的担忧,互联网的运营商在接入端,大量使用了 NAT 设备,使得普通的网络用户,缺乏直接可以访问的公网 IP。
因而,构建一个 P2P 网络首先需要解决网络的连通性。
解决网络连通
笔记:主流的解决方法
-
探索
P2P 网络的每个节点,都会首先会通过 STUN 服务器探索自己的公网 IP/port
-
注册
然后在 bootstrap/signaling server 上注册自己的公网 IP/port,让别人能发现自己,从而和潜在的“邻居”建立连接。
在一个大型的 P2P 网络中,一个节点常常会拥有几十个邻居,通过这些邻居以及邻居掌握的网络信息,每个节点都能构建一张如何找到某个节点(某个数据)的路由表。
在此之上,节点还可以加入某个或者某些 topic,然后通过某些协议(比如 gossip)在整个 topic 下扩散消息:
承载多个协议
P2P 网络的构建,一般要比客户端 / 服务器网络复杂,因为节点间的连接要承载很多协议:
-
节点发现(mDNS、bootstrap、Kad DHT)
-
节点路由(Kad DHT)
-
内容发现(pubsub、Kad DHT)
-
应用层协议
同时,连接的安全性受到的挑战也和之前不同。
网络安全解决
在网络安全方面,TLS 虽然能很好地保护客户端 / 服务器模型,然而证书的创建、发放以及信任对 P2P 网络是个问题
所以 P2P 网络倾向于使用自己的安全协议,或者使用 noise protocol,来构建安全等级可以媲美 TLS 1.3 的安全协议。
Rust 如何处理 P2P 网络
在 Rust 下,有 **libp2p 这个比较成熟的库**来处理 P2P 网络。
P2P聊天应用
例子: 下面是一个简单的 P2P 聊天应用:在本地网络中通过 MDNS 做节点发现,使用 floodpub 做消息传播。 (github)
在关键位置都写了注释:
use anyhow::Result; use futures::StreamExt; use libp2p::{ core::upgrade, floodsub::{self, Floodsub, FloodsubEvent, Topic}, identity, mdns::{Mdns, MdnsEvent}, noise, swarm::{NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent}, tcp::TokioTcpConfig, yamux, NetworkBehaviour, PeerId, Swarm, Transport, }; use std::borrow::Cow; use tokio::io::{stdin, AsyncBufReadExt, BufReader}; /// 处理 p2p 网络的 behavior 数据结构 /// 里面的每个域需要实现 NetworkBehaviour,或者使用 #[behaviour(ignore)] #[derive(NetworkBehaviour)] #[behaviour(event_process = true)] struct ChatBehavior { /// flood subscription,比较浪费带宽,gossipsub 是更好的选择 floodsub: Floodsub, /// 本地节点发现机制 mdns: Mdns, // 在 behavior 结构中,你也可以放其它数据,但需要 ignore // #[behaviour(ignore)] // _useless: String, } impl ChatBehavior { /// 创建一个新的 ChatBehavior pub async fn new(id: PeerId) -> Result<Self> { Ok(Self { mdns: Mdns::new(Default::default()).await?, floodsub: Floodsub::new(id), }) } } impl NetworkBehaviourEventProcess<FloodsubEvent> for ChatBehavior { // 处理 floodsub 产生的消息 fn inject_event(&mut self, event: FloodsubEvent) { if let FloodsubEvent::Message(msg) = event { let text = String::from_utf8_lossy(&msg.data); println!("{:?}: {:?}", msg.source, text); } } } impl NetworkBehaviourEventProcess<MdnsEvent> for ChatBehavior { fn inject_event(&mut self, event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { // 把 mdns 发现的新的 peer 加入到 floodsub 的 view 中 for (id, addr) in list { println!("Got peer: {} with addr {}", &id, &addr); self.floodsub.add_node_to_partial_view(id); } } MdnsEvent::Expired(list) => { // 把 mdns 发现的离开的 peer 加入到 floodsub 的 view 中 for (id, addr) in list { println!("Removed peer: {} with addr {}", &id, &addr); self.floodsub.remove_node_from_partial_view(&id); } } } } } #[tokio::main] async fn main() -> Result<()> { // 如果带参数,当成一个 topic let name = match std::env::args().nth(1) { Some(arg) => Cow::Owned(arg), None => Cow::Borrowed("lobby"), }; // 创建 floodsub topic let topic = floodsub::Topic::new(name); // 创建 swarm let mut swarm = create_swarm(topic.clone()).await?; swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?; // 获取 stdin 的每一行 let mut stdin = BufReader::new(stdin()).lines(); // main loop loop { tokio::select! { line = stdin.next_line() => { let line = line?.expect("stdin closed"); swarm.behaviour_mut().floodsub.publish(topic.clone(), line.as_bytes()); } event = swarm.select_next_some() => { if let SwarmEvent::NewListenAddr { address, .. } = event { println!("Listening on {:?}", address); } } } } } async fn create_swarm(topic: Topic) -> Result<Swarm<ChatBehavior>> { // 创建 identity(密钥对) let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); println!("Local peer id: {:?}", peer_id); // 使用 noise protocol 来处理加密和认证 let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys)?; // 创建传输层 let transport = TokioTcpConfig::new() .nodelay(true) .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(yamux::YamuxConfig::default()) .boxed(); // 创建 chat behavior let mut behavior = ChatBehavior::new(peer_id).await?; // 订阅某个主题 behavior.floodsub.subscribe(topic.clone()); // 创建 swarm let swarm = SwarmBuilder::new(transport, behavior, peer_id) .executor(Box::new(|fut| { tokio::spawn(fut); })) .build(); Ok(swarm) }
要运行这段代码,你需要在 Cargo.toml 中使用 futures 和 libp2p:
futures = "0.3"
libp2p = { version = "0.39", features = ["tcp-tokio"] }
演示
例子: 开一个窗口 A 运行:
❯ cargo run --example p2p_chat --quiet
Local peer id: PeerId("12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg")
Listening on "/ip4/127.0.0.1/tcp/51654"
// 下面的内容在新节点加入时逐渐出现
Got peer: 12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA with addr /ip4/192.168.86.23/tcp/51656
Got peer: 12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA with addr /ip4/127.0.0.1/tcp/51656
Got peer: 12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT with addr /ip4/192.168.86.23/tcp/51661
Got peer: 12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT with addr /ip4/127.0.0.1/tcp/51661
Got peer: 12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh with addr /ip4/192.168.86.23/tcp/51670
Got peer: 12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh with addr /ip4/127.0.0.1/tcp/51670
例子: 窗口 B
#![allow(unused)] fn main() { ❯ cargo run --example p2p_chat --quiet Local peer id: PeerId("12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA") Listening on "/ip4/127.0.0.1/tcp/51656" Got peer: 12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg with addr /ip4/192.168.86.23/tcp/51654 Got peer: 12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg with addr /ip4/127.0.0.1/tcp/51654 // 下面的内容在新节点加入时逐渐出现 Got peer: 12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT with addr /ip4/192.168.86.23/tcp/51661 Got peer: 12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT with addr /ip4/127.0.0.1/tcp/51661 Got peer: 12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh with addr /ip4/192.168.86.23/tcp/51670 Got peer: 12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh with addr /ip4/127.0.0.1/tcp/51670 }
例子: 窗口C
❯ cargo run --example p2p_chat --quiet
Local peer id: PeerId("12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT")
Listening on "/ip4/127.0.0.1/tcp/51661"
Got peer: 12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA with addr /ip4/192.168.86.23/tcp/51656
Got peer: 12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA with addr /ip4/127.0.0.1/tcp/51656
Got peer: 12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg with addr /ip4/192.168.86.23/tcp/51654
Got peer: 12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg with addr /ip4/127.0.0.1/tcp/51654
// 下面的内容在新节点加入时逐渐出现
Got peer: 12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh with addr /ip4/192.168.86.23/tcp/51670
Got peer: 12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh with addr /ip4/127.0.0.1/tcp/51670
例子: 窗口 D 使用 topic 参数,让它和其它的 topic 不同
#![allow(unused)] fn main() { ❯ cargo run --example p2p_chat --quiet -- hello Local peer id: PeerId("12D3KooWRy9r8j7UQMxavqTcNmoz1JmnLcTU5UZvzvE5jz4Zw3eh") Listening on "/ip4/127.0.0.1/tcp/51670" Got peer: 12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT with addr /ip4/192.168.86.23/tcp/51661 Got peer: 12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT with addr /ip4/127.0.0.1/tcp/51661 Got peer: 12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA with addr /ip4/192.168.86.23/tcp/51656 Got peer: 12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA with addr /ip4/127.0.0.1/tcp/51656 Got peer: 12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg with addr /ip4/192.168.86.23/tcp/51654 Got peer: 12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg with addr /ip4/127.0.0.1/tcp/51654 }
你会看到,每个节点运行时,都会通过 MDNS 广播,来发现本地已有的 P2P 节点。
现在 A/B/C/D 组成了一个 P2P 网络,其中 A/B/C 都订阅了 lobby,而 D 订阅了 hello。
例子: 我们在 A/B/C/D 四个窗口中分别输入 “Hello from X”,可以看到
- 窗口 A:
hello from A
PeerId("12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA"): "hello from B"
PeerId("12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT"): "hello from C"
- 窗口 B:
PeerId("12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg"): "hello from A"
hello from B
PeerId("12D3KooWMRQvxJcjcexCrNfgSVd2iChpiDWzbgRRS6c5mn9bBzdT"): "hello from C"
- 窗口 C:
#![allow(unused)] fn main() { PeerId("12D3KooWDJtZVKBCa7B9C8ZQmRpP7cB7CgeG7PWLXYCnN3aXkaVg"): "hello from A" PeerId("12D3KooWAw1gTLCesw1bvTiKNYFyacwbAcjvKwfDsJiH8AuBFgFA"): "hello from B" hello from C }
- 窗口 D:
#![allow(unused)] fn main() { hello from D }
可以看到,在 lobby 下的 A/B/C 都收到了各自的消息。
这个使用 libp2p 的聊天代码,如果你读不懂,没关系。
P2P 有大量的新的概念和协议需要预先掌握,所以如果你对这些概念和协议感兴趣,可以自行阅读 * libp2p 的文档* ,以及它的**示例代码**。
通讯模型练习题
- 看一看 libp2p 的文档和示例代码,把 libp2p clone 到本地,运行每个示例代码。
- 阅读 libp2p 的 NetworkBehaviour trait ,以及 floodsub 对应的实现。
- 尝试把P2P聊天应用例子中的 floodsub 替换成**更高效更节省带宽的 gossipsub**。