配置加载类设计
上面我们已经对网关项目的各个模块都进行了了解了,那么接下来我们要做的事情就是去动手实现。 再上面的架构设计中我们已经提出,我们的项目需要提供足够的扩展性,方便我们的项目运行时支持各类配置,因此,我们首先从配置这一块下手。 最基础的,我们首先设计一个类用于为网关提供配置,这个类支持我们的项目从配置文件、JVM参数、运行时参数、环境变量中进行我们配置的读取。 对于配置加载类的设计就有点像我们SpringBoot项目了,都会在启动的时候按照约定读取特定的配置文件进行加载。 那么Java也为我们提供了许多非常方便的从不同地方读取配置信息的API。 这里我们直接看代码,代码比较容易理解。
package cyou.breathe.gateway.config.loader;
import cyou.breathe.gateway.common.utils.PropertiesUtil;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.config.util.ConfigUtil;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.Properties;
import static cyou.breathe.gateway.common.constants.ConfigConstant.*;
/**
* 配置加载类
* @author: breathe
* @createTime: 2025-10-02
*/@NoArgsConstructor
public class ConfigLoader {
public static final ConfigLoader INSTANCE = new ConfigLoader();
public static ConfigLoader getInstance() {
return INSTANCE;
}
private Config config;
public static Config getConfig() {
return INSTANCE.config;
}
/**
* 优先级高的会覆盖优先级低的
* 运行参数 -> jvm参数 -> 环境变量 -> 配置文件 -> 配置对象对默认值
*/
public Config load(String[] args) {
//配置类默认值
config = new Config();
//配置文件
loadFromConfigFile();
//环境变量
loadFromEnv();
//jvm参数
loadFromJvm();
//运行参数
loadFromArgs(args);
return config;
}
private void loadFromConfigFile() {
config = ConfigUtil.loadConfigFromYaml(CONFIG_PATH, Config.class, CONFIG_PREFIX);
}
private void loadFromEnv() {
Map<String, String> env = System.getenv();
Properties properties = new Properties();
properties.putAll(env);
PropertiesUtil.properties2Object(properties, config, ENV_PREFIX);
}
private void loadFromJvm() {
Properties properties = System.getProperties();
PropertiesUtil.properties2Object(properties, config, JVM_PREFIX);
}
private void loadFromArgs(String[] args) {
if (args != null && args.length > 0) {
Properties properties = new Properties();
for (String arg : args) {
if (arg.startsWith("--") && arg.contains("=")) {
properties.put(arg.substring(2, arg.indexOf("=")),
arg.substring(arg.indexOf("=") + 1));
}
} PropertiesUtil.properties2Object(properties, config);
}
}}这里的代码其实就是按顺序读取各个地方的配置,并且按照优先级顺序,高优先级会覆盖低优先级的配置。 最后读取完毕所有的配置之后得到一个Config类,这个就是当前我们项目的配置信息了。 下面附上配置方法。 ![[../../photo/Pasted image 20251019150330.png]] 在其中你的environment variables来配置 配置文件的话: ![[../../photo/Pasted image 20251019150542.png]]
基于Netty的网络通信层设计
Netty概述
在基于Netty进行设计之前,我们先按照老方法,介绍一下Netty,以及它的作用和使用场景。
- Netty 是什么: Netty 是一个高性能的、异步事件驱动的网络应用程序框架,支持快速开发可维护的高性能协议服务器和客户端。它是一个在 Java NIO 的基础上构建的网络编程框架,提供了易于使用的API。
- Netty 的具体功能:
- 异步和事件驱动:Netty 提供了一个多线程的事件循环,用于处理所有网络事件,例如连接、数据发送和接收。
- 支持多协议:它可以支持多种传输协议,包括 TCP、UDP,以及更高级的协议如 HTTP、HTTPS、WebSocket。
- 高度可定制:可以通过ChannelHandler来定制处理网络事件的逻辑,支持编解码器、拦截器等。
- 性能优化:利用池化和复用技术来减少资源消耗,减少GC压力,优化内存使用。
- 安全性:内置了对 SSL/TLS 协议的支持,确保数据传输安全。
- Netty 中的核心概念:
- Boss 和 Worker 线程:在 Netty 的服务器端,"Boss" 线程负责处理连接的建立,而 "Worker" 线程负责处理已连接的通道的IO操作。这种模型允许Boss线程迅速处理新的连接,并将数据传输的处理任务委托给Worker线程。
- Channel:代表一个到远程节点的开放连接,可以进行读写操作。
- EventLoop:用于处理连接的生命周期中的所有事件,每个Channel都分配给了一个EventLoop。
- ChannelHandler:核心处理器,可以响应入站和/或出站事件和数据。
- Netty 的使用场景:
- Web服务器和客户端:使用Netty作为底层通信组件来构建自己的Web服务器和HTTP客户端。
- 实时通信系统:如在线游戏的服务器、聊天服务器,因为Netty支持WebSocket和TCP协议,适合需要低延迟和大量并发连接的应用。
服务端实现
首先,在 Maven pom.xml 文件中引入Netty的依赖:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.59.Final</version>
<!-- 使用最新版本 -->
</dependency>
</dependencies>在基于 Netty设计服务端之前我们首先需要了解一下几个Netty中的核心概念:
- EventLoopGroup: 这是Netty中的一个核心组件,负责处理所有的I/O操作。EventLoopGroup是一个包含多个EventLoop的组,每个EventLoop都是一个单线程循环,负责处理连接的生命周期内的所有事件。其分为boss和worker两种类型的线程组。boss线程组通常负责接受新的客户端连接,而worker线程组负责处理boss线程组接受的连接的后续I/O操作。
- ServerBootstrap: 这个类是一个帮助类,用于设置服务器。它允许我们设置服务器所需的所有参数,如端口、使用的EventLoopGroup等。ServerBootstrap还允许为新接受的连接以及连接后的通道设置属性和处理程序。
- Channel: Channel接口代表一个到远程节点的开放连接,可以进行读写操作。在Netty中,Channel是网络通信的基础组件,每个连接都会创建一个新的Channel。
- ChannelInitializer: 这是一个特殊的处理程序,用于配置新注册的Channel的ChannelPipeline,它提供了一个容易扩展的方式来初始化Channel,一旦Channel注册到EventLoop上,就会调用ChannelInitializer。
- ChannelPipeline: 这个接口表示一个ChannelHandler的链表,用于处理或拦截入站和出站操作。它使得可以容易地添加或删除处理程序。
- ChannelHandler: 接口定义了很多事件处理方法,你可以通过实现这些方法来进行自定义的事件处理。事件可以是入站也可以是出站的,例如数据读取、写入、连接开启和关闭。
- ChannelHandlerContext: 提供了一个接口,用于在ChannelHandler中进行交互操作。通过这个上下文对象,处理程序可以传递事件、修改管道、存储处理信息等。
- ChannelOption 和 ChannelConfig: 这些类和接口用于配置Channel的参数,如连接超时、缓冲区大小等。
- NioEventLoopGroup 和 EpollEventLoopGroup: 这些类是EventLoopGroup的实现,分别对应于使用Java NIO和Epoll(只在Linux上可用)作为传输类型。Netty自动选择使用哪个实现,通常基于操作系统的能力和应用程序的需求。
- NioServerSocketChannel 和 EpollServerSocketChannel: 这些是Channel实现,表示服务器端的套接字通道。选择哪个实现通常取决于你选择的EventLoopGroup。
- 编解码器(Codec): Netty提供了一系列的编解码器用于数据的编码和解码,例如HttpServerCodec用于HTTP协议的编码和解码。 在你已经大致的知道了设计一个Netty客户端所涉及到的一些知识之后,我们来基于代码进行分析。
package cyou.breathe.gateway.core.netty;
import cyou.breathe.gateway.common.utils.RemotingUtil;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.core.config.LifeCycle;
import cyou.breathe.gateway.core.netty.handler.NettyHttpServerHandler;
import cyou.breathe.gateway.core.netty.handler.NettyServerConnectManagerHandler;
import cyou.breathe.gateway.core.netty.processor.NettyProcessor;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* NettyHttpServer * @author: breathe
* @createTime: 2025-10-04
*/@Slf4j
@Data
public class NettyHttpServer implements LifeCycle {
/**
* 配置
*/
private final Config config;
/**
* NettyProcessor */ private final NettyProcessor nettyProcessor;
/**
* 服务器
*/
private ServerBootstrap serverBootstrap;
/**
* boss线程组
*/
private EventLoopGroup eventLoopGroupBoss;
/**
* worker线程组
*/
private EventLoopGroup eventLoopGroupWorker;
public NettyHttpServer(Config config, NettyProcessor nettyProcessor) {
this.config = config;
this.nettyProcessor = nettyProcessor;
init();
}
@Override
public void init() {
this.serverBootstrap = new ServerBootstrap();
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(config.getNetty().getEventLoopGroupBossNum(),
new DefaultThreadFactory("netty-boss-nio"));
this.eventLoopGroupWorker = new EpollEventLoopGroup(config.getNetty().getEventLoopGroupWorkerNum(),
new DefaultThreadFactory("netty-worker-nio"));
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(config.getNetty().getEventLoopGroupBossNum(),
new DefaultThreadFactory("netty-boss-nio"));
this.eventLoopGroupWorker = new NioEventLoopGroup(config.getNetty().getEventLoopGroupWorkerNum(),
new DefaultThreadFactory("netty-worker-nio"));
}
}
/**
* 是否使用epoll
*/ public boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
}
/**
* 启动
*/
@Override
public void start() {
this.serverBootstrap
.group(eventLoopGroupBoss, eventLoopGroupWorker)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
// 设置发送数据缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, 65535)
// 设置接收数据缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, 65535)
// 本机+port=netty的服务器
.localAddress(new InetSocketAddress(config.getPort()))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
//http编解码
new HttpServerCodec(),
//请求报文聚合成FullHttpRequest
new HttpObjectAggregator(config.getNetty().getMaxContentLength()),
new HttpServerExpectContinueHandler(),
new NettyHttpServerHandler(nettyProcessor),
new NettyServerConnectManagerHandler()
);
}
});
try {
this.serverBootstrap.bind().sync();
log.info("netty 服务启动,端口: {}", this.config.getPort());
} catch (Exception e) {
throw new RuntimeException();
}
}
/**
* 优雅停机
*/
@Override
public void shutdown() {
if (eventLoopGroupBoss != null) {
eventLoopGroupBoss.shutdownGracefully();
}
if (eventLoopGroupWorker != null) {
eventLoopGroupWorker.shutdownGracefully();
}
}}大部分地方都比较容易理解,在init方法中我们初始化了EventLoopGroup来帮助我们处理我们的IO请求。 在start这个重点方法中我们基于ServerBootStrap进行了对Netty的配置。 我们依靠ChannelInitializer来添加通道处理类。 在这个ChannelInitializer中,只有两行代码是最重要的:
// 自定义的处理器
new NettyHttpServerHandler(nettyProcessor),
// 连接管理处理器
new NettyServerConnectManagerHandler()这行代码意味着进入到Netty通道中的请求需要被我的这个自定义的处理器类所处理。 所以我们来分析一下这个处理器类都起到了什么作用。 首先是先分析NettyHttpServerHandler这个类是什么,它的作用是什么:
package cyou.breathe.gateway.core.netty.handler;
import cyou.breathe.gateway.core.context.HttpRequestWrapper;
import cyou.breathe.gateway.core.netty.processor.NettyProcessor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
/**
* netty 服务端处理器
* @author: breathe
* @createTime: 2025-10-04
*/public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter {
private final NettyProcessor nettyProcessor;
public NettyHttpServerHandler(NettyProcessor nettyProcessor) {
//TODO
// 1. 因为我们现在的性能瓶颈是netty,所以我们可以在这里添加一个批处理的优化,也是一个无锁设计,我们可以将他在这里引入一个无锁队列来提高性能
// 2. 修改垃圾回收器为ZGC
this.nettyProcessor = nettyProcessor;
}
/**
* 读取数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest request = (FullHttpRequest) msg;
HttpRequestWrapper httpRequestWrapper = new HttpRequestWrapper();
httpRequestWrapper.setCtx(ctx);
httpRequestWrapper.setRequest(request);
nettyProcessor.process(httpRequestWrapper);
}
}ChannelInboundHandlerAdapter提供了一种简便的方式来帮助我们处理入站的网络事件。 其中channelRead比较重要,它可以帮助我解析和处理接收到的数据,因为在Netty中,消息通常是 ByteBuf 的形式,但也可以是任何我在ChannelPipeline 中设置的解码器能够处理的类型。因此我可以在这个方法中实现我的业务逻辑,也可以调用业务逻辑处理器来处理我接受到的数据。
这个请求和FullHttpRequest有着巨大的关系,在Netty框架中,FullHttpRequest 类是一个接口,它代表了一个完整的HTTP请求。这个类的作用是封装了HTTP请求的所有部分,包括请求行(如方法GET/POST、URI、HTTP版本)、请求头(Headers)以及请求体(Body)。因此我们只要拿到了这个类的信息并且保存,我们就可以在后续随时的对这一次的请求信息进行分析并做出对应的处理 再来看看上面提到的ChannelHandlerContext。可以看到他有点类似于过滤器链,指向了下一个要处理当前请求的类。它的作用在上面我也已经讲解到了,详细的讲解之后再说 然后我们来分析这里的NettyProcessor这个接口的实现类的作用。 ![[../../photo/Pasted image 20251019151009.png]]
可以发现,请求在走到这里的时候,其实接下来就即将开始我的正式对请求的处理过程了,也就是保存并填充我的网关请求上下文,也就是这里的GatewayContext,这个类中包含了当前请求所需要使用的规则,请求体与响应体。保存这个信息是为了接下来后续方便我对请求的不同规则有不同的处理。 不过我并没有在这里就打算马上开始讲解对请求的过滤器链的处理,因为这一节我将侧重在Netty这一块的设计。 那么接下来我们来看Netty对于网络连接这一块的处理吧。
网络链接管理
代码中已经比较详细的讲解了这个类的作用。它负责对我们网络请求链接的生命周期进行处理。 这个类对于我们的设计并不是最重要的,所以这里我选择一笔带过这个类。自行查看代码并且进行Debug了解一下什么时候会执行这个类中的方法即可。 比如第一次发送请求创建链接的时候就会调用register和active方法。
package cyou.breathe.gateway.core.netty.handler;
import cyou.breathe.gateway.common.utils.RemotingHelper;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* 连接管理器,管理连接的生命周期
* @author: breathe
* @createTime: 2025-10-04
*/@Slf4j
public class NettyServerConnectManagerHandler extends ChannelDuplexHandler {
/**
* 当Channel注册到它的EventLoop并且能够处理I/O时调用
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.debug("NETTY 服务器管道:channel注册 {}", remoteAddr);
super.channelRegistered(ctx);
}
/**
* 当Channel从它的EventLoop中注销并且无法处理任何I/O时调用
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.debug("NETTY 服务器管道:channelUnregistered {}", remoteAddr);
super.channelUnregistered(ctx);
}
/**
* 当Channel处理于活动状态时被调用,可以接收与发送数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.debug("NETTY 服务器管道:channelActive {}", remoteAddr);
super.channelActive(ctx);
}
/**
* 不再是活动状态且不再连接它的远程节点时被调用
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.debug("NETTY 服务器管道: channelInactive {}", remoteAddr);
super.channelInactive(ctx);
}
/**
* 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时触发
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent event) {
//有一段时间没有收到或发送任何数据
if(event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY 服务器管道: userEventTriggered: IDLE {}", remoteAddr);
ctx.channel().close();
}
} ctx.fireUserEventTriggered(evt);
}
/**
* 当ChannelHandler在处理过程中出现异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY 服务器管道: remoteAddr: {}, exceptionCaught {}", remoteAddr, cause);
ctx.channel().close();
}
}客户端实现
由于前面我已经在服务端实现的时候列举了所需要的一些关键的组件,而客户端的实现所需要用到的也差不多,所以就不在重复罗列。 这里我讲解一下实现一个异步的HTTP通信客户端如何去实现,这里我用到的是 AsyncHttpClient 这样的高层库,它基于Netty构建,提供了异步的HTTP客户端功能,它可以非阻塞地发送HTTP请求,并且能够高效地处理HTTP响应。 讲解了这些,我们再来分析我们的代码就会比较容易理解了:
package cyou.breathe.gateway.core.netty;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.config.config.HttpClientConfig;
import cyou.breathe.gateway.core.config.LifeCycle;
import cyou.breathe.gateway.core.helper.AsyncHttpHelper;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* NettyHttpClient * @author: breathe
* @createTime: 2025-10-04
*/@Slf4j
public class NettyHttpClient implements LifeCycle {
/**
* 配置对象,包含HTTP客户端的配置参数
*/
private final Config config;
/**
* Netty的事件循环组,用于处理客户端的网络事件
*/
private final EventLoopGroup eventLoopGroupWorker;
/**
* 异步HTTP客户端实例
*/
private AsyncHttpClient asyncHttpClient;
/**
* 构造函数,创建NettyHttpClient的实例。
* @param config 包含客户端配置的对象。
* @param eventLoopGroupWorker 用于客户端事件处理的Netty事件循环组。
*/
public NettyHttpClient(Config config, EventLoopGroup eventLoopGroupWorker) {
this.config = config;
this.eventLoopGroupWorker = eventLoopGroupWorker;
init();
}
/**
* 初始化异步HTTP客户端,设置其配置参数。
*/
@Override
public void init() {
// 创建异步HTTP客户端配置的构建器
DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder()
//设置事件循环组
.setEventLoopGroup(eventLoopGroupWorker)
//设置连接超时时间
.setConnectTimeout(config.getHttpClient().getHttpConnectTimeout())
//设置请求超时时间
.setRequestTimeout(config.getHttpClient().getHttpRequestTimeout())
//设置最大重定向次数
.setMaxRedirects(config.getHttpClient().getHttpMaxRequestRetry())
//池化的byteBuf分配器,提升性能
.setAllocator(PooledByteBufAllocator.DEFAULT)
//强制启用压缩
.setCompressionEnforced(true)
//设置最大连接数
.setMaxConnections(config.getHttpClient().getHttpMaxConnections())
//设置每个主机的最大连接数
.setMaxConnectionsPerHost(config.getHttpClient().getHttpConnectionsPerHost())
//设置连接池中连接的空闲时间
.setPooledConnectionIdleTimeout(config.getHttpClient().getHttpPooledConnectionIdleTimeout());
//创建异步HTTP客户端
this.asyncHttpClient = new DefaultAsyncHttpClient(builder.build());
}
@Override
public void start() {
// 使用AsyncHttpHelper单例模式初始化异步HTTP客户端
AsyncHttpHelper.getInstance().initialized(asyncHttpClient);
}
@Override
public void shutdown() {
// 如果客户端实例不为空,则尝试关闭它
if (asyncHttpClient != null) {
try {
// 关闭客户端,并处理可能的异常
this.asyncHttpClient.close();
} catch (IOException e) {
// 记录关闭时发生的错误
log.error("netty http client 停机时发生异常", e);
}
}
}
}我的客户端就是基于AsyncHttpClient这样子的高层库实现的异步数据交互。 其实到此,对于Netty这一块的设计基本就已经简述了,讲述的比较简单,但是我想可以帮助你顺利的去理解如何基于Netty实现一个网络通信。
容器构造
目前我们已经实现了对于当前项目最重要的一个模块,也就是网络通信模块,而我们的网关项目的所有操作其实都依赖于基于Netty实现的模块。 所以我又实现了一个Container容器用于管理管控Netty的模块,负责管控其生命周期。
package cyou.breathe.gateway.core.config;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.config.config.NettyConfig;
import cyou.breathe.gateway.core.netty.NettyHttpClient;
import cyou.breathe.gateway.core.netty.NettyHttpServer;
import cyou.breathe.gateway.core.netty.processor.DisruptorNettyCoreProcessor;
import cyou.breathe.gateway.core.netty.processor.NettyCoreProcessor;
import cyou.breathe.gateway.core.netty.processor.NettyProcessor;
import lombok.extern.slf4j.Slf4j;
import static cyou.breathe.gateway.common.constants.GatewayConstant.BUFFER_TYPE_PARALLEL;
/**
* 容器类
* @author: breathe
* @createTime: 2025-10-04
*/@Slf4j
public class Container implements LifeCycle {
private final Config config;
private NettyHttpServer nettyHttpServer;
private NettyHttpClient nettyHttpClient;
private NettyProcessor nettyProcessor;
public Container(Config config) {
this.config = config;
init();
}
@Override
public void init() {
NettyCoreProcessor nettyCoreProcessor = new NettyCoreProcessor();
if (BUFFER_TYPE_PARALLEL.equals(config.getNetty().getBufferType())) {
this.nettyProcessor = new DisruptorNettyCoreProcessor(config, nettyCoreProcessor);
} else {
this.nettyProcessor = nettyCoreProcessor;
}
this.nettyHttpServer = new NettyHttpServer(config, nettyProcessor);
this.nettyHttpClient = new NettyHttpClient(config, nettyHttpServer.getEventLoopGroupWorker());
}
@Override
public void start() {
nettyProcessor.start();
nettyHttpServer.start();;
nettyHttpClient.start();
log.info("breathe gateway 启动!!!");
}
@Override
public void shutdown() {
nettyProcessor.shutdown();
nettyHttpServer.shutdown();
nettyHttpClient.shutdown();
}
}代码依旧比较容易理解,其实就是将之前基于Netty实现的处理类封装到容器中,由容器进行统一管控。 ![[../../photo/Pasted image 20251019151420.png]] 这个是在bootstrap中的 之后我们只需要启动容器,我们的基于Netty搭建的网络通信框架就运行起来了,再此时就已经可以接收我们的网络处理请求了。
整合Nacos---服务注册与服务订阅的实现
什么是Nacos?
Nacos是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它是阿里巴巴开源的一个项目,专门用于微服务架构的服务治理。Nacos帮助实现了微服务架构中的服务自动发现、服务配置管理、服务元数据及流量管理等核心功能。以下是Nacos的一些主要功能:
- 服务发现和服务健康监测:Nacos支持基于DNS或HTTP的服务发现机制,能够实现云端服务的自动注册与发现。它还提供服务健康监测,确保请求仅被发送到健康的主机。
- 动态配置服务:动态管理所有服务的配置信息,支持配置自动更新,减少了服务配置变更带来的管理工作与更新延迟。
- 动态DNS服务:利用DNS服务,管理云服务的域名解析,实现服务的动态路由和负载均衡。
- 服务及其元数据管理:Nacos能够管理服务的元数据信息,如权重、区域、版本等,为服务路由、负载均衡提供依据。
- 支持AP和CP模式的服务:根据CAP理论,Nacos支持在AP(可用性和分区容错性)和CP(一致性和分区容错性)模式之间的切换,以满足不同场景的需求。
简而言之,作为目前最火热的注册中心和配置中心,Nacos提供了许多强大的功能,无论是可视化的Web网页方便操作,还是说开箱即用的特性,开源的代码和活跃的社区,以及CP和AP双支持的特性,都是我选择Nacos作为我项目注册中心和配置中心的原因。
几个重要概念
在讲解Nacos服务注册的具体代码之前,我们先简单的了解一下Nacos中的几个重要的概念。
- 服务(Service):在Nacos中,服务是指一个或多个相同功能的实例集合,服务通常对应于一个微服务或一个应用。
- 服务实例(Instance):服务实例是指运行服务的最小单位,通常是一个服务的单个运行节点。
- 配置管理(Configuration Management):配置管理允许您集中存储和管理在分布式系统环境中使用的所有配置。
- 服务注册(Service Registry):服务注册是指服务实例启动时,将自己的网络地址(如IP+Port)注册到Nacos中的过程。
- 服务发现(Service Discovery):服务发现是指消费者从Nacos获取服务实例信息的过程,以便进行网络调用。
- 命名空间(Namespace):用于实现环境隔离,不同的命名空间下可以有相同名称的服务。
- 数据持久化:Nacos支持数据的持久化存储,它可以将服务信息和配置信息持久化到外部存储(如MySQL数据库)中。
- 分组(Group):用于进一步进行环境隔离。
如何将服务注册到Nacos?
上面说了那么多Nacos的优点,接下来就是分析,如何才能将我们的服务注册到Nacos上呢? 添加Nacos客户端依赖:在项目的pom.xml文件中,添加Nacos客户端的依赖。
<dependencies>
<!-- 添加Nacos客户端依赖 -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 添加日志依赖,因为nacos-client使用SLF4J记录日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>之后,为了增加通用性以及为了后续方便修改,我们将和注册中心有关的服务抽取到一个接口中:
package cyou.breathe.gateway.register.center.api;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.config.entity.ServiceDefinition;
import cyou.breathe.gateway.config.entity.ServiceInstance;
/**
* 注册中心处理器
* @author: breathe
* @createTime: 2025-10-02
*/public interface RegisterCenter {
/**
* 注册中心初始化
*/
void init(Config config);
/**
* 注册服务实例
*/
void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance);
/**
* 注销服务实例
*/
void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance);
/**
* 订阅注册中心实例变化
*/
void subscribeAllServices(RegisterCenterListener listener);
}同时,由于注册中心中注册的服务实例可能发生变化,所以我们还需要提供一个注册中心的监听器来监听注册中心配置的变更。
package cyou.breathe.gateway.register.center.api;
import cyou.breathe.gateway.config.entity.ServiceDefinition;
import cyou.breathe.gateway.config.entity.ServiceInstance;
import java.util.Set;
/**
* 注册中心监听器
* @author: breathe
* @createTime: 2025-10-02
*/public interface RegisterCenterListener {
/**
* 某服务有实例变化时调用此方法
*/
void onChange(ServiceDefinition serviceDefinition, Set<ServiceInstance> newInstances);
}整合完毕之后,我们就可以开始考虑如何基于Nacos-Client提供的各类服务来将我们的服务注册到注册中心了。 首先我先贴出Nacos继承的将当前服务注册到服务中心的测试代码:
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import cyou.breathe.gateway.common.utils.NetUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* 测试nacos注册中心
* @author: breathe
* @createTime: 2025-10-04
*/@Slf4j
public class TestNacosRegisterCenter {
public static void main(String[] args) {
String serverAddress = "127.0.0.1:8848";
try {
// 创建NamingService
NamingService namingService = NamingFactory.createNamingService(serverAddress);
// 创建实例
Instance instance = new Instance();
instance.setIp(NetUtil.getLocalIp());
instance.setPort(10101);
instance.setServiceName("test-service");
instance.setClusterName("test-cluster-service");
instance.addMetadata("version", "1.0");
instance.addMetadata("env", "test");
// 元数据
Map<String, String> metadata = buildMetadata();
instance.setMetadata(metadata);
System.out.println(JSON.toJSONString(metadata));
// 注册实例
namingService.registerInstance(instance.getServiceName(), instance);
System.out.println("服务注册成功");
Thread.sleep(1000000);
} catch (NacosException e) {
log.error("create NamingService failed, serverAddress: {}", serverAddress, e);
} catch (InterruptedException e) {
log.info("服务结束");
}
}
private static Map<String, String> buildMetadata() {
Map<String, String> metadata = new HashMap<>();
metadata.put("version", "1.0");
metadata.put("env", "test");
metadata.put("appName", "test-service");
return metadata;
}
}运行结果是: ![[../../photo/Pasted image 20251019192853.png]] 可以发现其实想要将我们的服务的信息注册到注册中心是非常容易的。 只需要基于我们的NamingService即可,然后调用registerInstance方法将我们设定好的服务实例信息注册上去即可。 那么有了上面的基础知识,再来看看我写的代码:
package cyou.breathe.gateway.register.center.nacos;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingMaintainFactory;
import com.alibaba.nacos.api.naming.NamingMaintainService;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.utils.CollectionUtils;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.config.config.RegisterCenterConfig;
import cyou.breathe.gateway.config.entity.ServiceDefinition;
import cyou.breathe.gateway.config.entity.ServiceInstance;
import cyou.breathe.gateway.register.center.api.RegisterCenter;
import cyou.breathe.gateway.register.center.api.RegisterCenterListener;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static cyou.breathe.gateway.common.constants.NacosConstant.META_DATA_KEY;
/**
* nacos注册中心
* @author: breathe
* @createTime: 2025-10-02
*/@Slf4j
public class NacosRegisterCenter implements RegisterCenter {
/**
* 总配置
*/
private RegisterCenterConfig registerCenterConfig;
/**
* 维护服务实例信息
*/
private NamingService namingService;
/**
* 维护服务定义信息
*/
private NamingMaintainService namingMaintainService;
/**
* 监听器列表
*/
private final List<RegisterCenterListener> registerCenterListenerList = new CopyOnWriteArrayList<>();
/**
* 定时任务线程池
*/
private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, new NameThreadFactory("doSubscribeAllServices"));
/**
* 是否完成初始化
*/
private final AtomicBoolean init = new AtomicBoolean(false);
@Override
public void init(Config config) {
if (!init.compareAndSet(false, true)) {
return ;
}
this.registerCenterConfig = config.getRegisterCenter();
try {
this.namingMaintainService = NamingMaintainFactory.createMaintainService(config.getRegisterCenter().getAddress());
this.namingService = NamingFactory.createNamingService(config.getRegisterCenter().getAddress());
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
@Override
public void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
try {
//构造nacos实例信息
Instance nacosInstance = new Instance();
nacosInstance.setInstanceId(serviceInstance.getServiceInstanceId());
nacosInstance.setPort(serviceInstance.getPort());
nacosInstance.setIp(serviceInstance.getIp());
nacosInstance.setMetadata(Map.of(META_DATA_KEY, JSON.toJSONString(serviceInstance)));
//注册
namingService.registerInstance(
serviceDefinition.getServiceId(),
registerCenterConfig.getNacos().getGroup(),
nacosInstance);
//更新服务定义
namingMaintainService.updateService(
serviceDefinition.getServiceId(),
registerCenterConfig.getNacos().getGroup(),
0,
Map.of(META_DATA_KEY, JSON.toJSONString(serviceDefinition)));
log.info("注册 {} {}", serviceDefinition, serviceInstance);
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
@Override
public void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
try {
namingService.registerInstance(
serviceDefinition.getServiceId(),
registerCenterConfig.getNacos().getGroup(),
serviceInstance.getIp(),
serviceInstance.getPort());
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
@Override
public void subscribeAllServices(RegisterCenterListener registerCenterListener) {
// 添加监听器
registerCenterListenerList.add(registerCenterListener);
//进行服务订阅
doSubscribeAllServices();
//循环执行服务发现与订阅操作
scheduledThreadPool.scheduleWithFixedDelay(this::doSubscribeAllServices,
10, 10, TimeUnit.SECONDS);
}
private void doSubscribeAllServices() {
try {
//得到当前服务已经订阅的服务
//这里其实已经在init的时候初始化过 naming service 了,所以这里可以直接拿到当前服务已经订阅的服务
Set<String> subscribeService = namingService.getSubscribeServices().stream()
.map(ServiceInfo::getName).collect(Collectors.toSet());
int pageNo = 1;
int pageSize = 100;
//分页从nacos拿到服务列表
List<String> serviseList = namingService
.getServicesOfServer(pageNo, pageSize, registerCenterConfig.getNacos().getGroup()).getData();
//拿到所有的服务名称后进行遍历
while (CollectionUtils.isNotEmpty(serviseList)) {
log.info("订阅的服务数: {}", serviseList.size());
for (String service : serviseList) {
//判断是否已经订阅了当前服务
if (subscribeService.contains(service)) {
continue ;
}
//nacos事件监听器 订阅当前服务
//这里我们需要自己实现一个nacos的事件订阅类 来具体执行订阅执行时的操作
EventListener eventListener = new NacosRegisterListener();
//当前服务之前不存在 调用监听器方法进行添加处理,进行初始化
eventListener.onEvent(new NamingEvent(service, null));
//为指定的服务和环境注册一个事件监听器
namingService.subscribe(service, registerCenterConfig.getNacos().getGroup(), eventListener);
log.info("订阅 {} {}", service, registerCenterConfig.getNacos().getGroup());
}
//遍历下一页的服务列表
serviseList = namingService.getServicesOfServer(++ pageNo, pageSize, registerCenterConfig.getNacos().getGroup()).getData();
}
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
/**
* 实现对nacos事件的监听器 这个事件监听器会在Nacos发生事件变化的时候进行回调
* NamingEvent 是一个事件对象,用于表示与服务命名空间(Naming)相关的事件。
* NamingEvent 的作用是用于监听和处理命名空间中的服务实例(Service Instance)的变化,
* 以便应用程序可以根据这些变化来动态地更新服务实例列表,以保持与注册中心的同步。
*/
public class NacosRegisterListener implements EventListener {
@Override
public void onEvent(Event event) {
//先判断是否是注册中心事件。服务命名
if (event instanceof NamingEvent namingEvent) {
String serviceName = namingEvent.getServiceName();
try {
//获取服务定义信息
Service service = namingMaintainService.queryService(serviceName, registerCenterConfig.getNacos().getGroup());
ServiceDefinition serviceDefinition = JSON.parseObject(service.getMetadata()
.get(META_DATA_KEY), ServiceDefinition.class);
//获取服务实例信息
List<Instance> allInstances = namingService.getAllInstances(service.getName(), registerCenterConfig.getNacos().getGroup());
Set<ServiceInstance> set = new HashSet<>();
/**
* meta-data数据如下
* {
* "version": "1.0.0", * "environment": "production", * "weight": 80, * "region": "us-west", * "labels": "web, primary", * "description": "Main production service" * } */ for (Instance instance : allInstances) {
ServiceInstance serviceInstance = JSON.parseObject(instance.getMetadata().get(META_DATA_KEY), ServiceInstance.class);
set.add(serviceInstance);
}
/**
* 调用每个监听器的onChange方法,传入服务定义信息和服务实例集合。
*/
registerCenterListenerList.forEach(l -> l.onChange(serviceDefinition, set));
} catch (NacosException e) {
throw new RuntimeException(e);
}
} } }}代码比较多,我们一个一个来看,NamingService我们已经了解了,这里多了一个namingMaintainService。 NamingMaintainService 是 Nacos 客户端提供的一个接口,其作用是维护服务信息,包括服务的更新、查询和删除。它与 NamingService 接口不同,后者主要用于服务的注册和发现。 NamingMaintainService 提供了更细粒度的控制,允许服务提供者对服务的元数据进行维护,这对于服务的版本控制、区域设置、权重调整等是非常有用的。 在使用 NamingMaintainService 时,我们可以执行以下操作:
- 更新服务信息:可以更新一个服务的元数据,这包括服务的保护阈值、元数据等。
- 查询服务信息:可以查询服务的当前配置状态,以便进行审查或者其他操作。
- 删除服务信息:如果一个服务不再需要在注册中心注册,可以使用 NamingMaintainService 将其删除。
在我的代码中,NamingMaintainService 被用于更新服务定义信息,即当服务定义发生变更时,可以使用这个服务来推送新的服务定义到 Nacos,从而使得注册中心的服务列表保持最新状态。 因此,register方法我们就不多说了,比较好理解,同理deregister方法也比较好理解。 我们按照顺序先来讲解subscribeAllServices方法。 顾名思义,这是一个订阅方法。subscribeAllServices方法的作用是在Nacos客户端订阅所有服务的变化事件。这是微服务架构中的一个常见需求,因为服务实例可能会动态地上线或下线,服务列表可能会频繁变化。这个方法允许客户端保持对服务状态的最新视图,并且可以在服务变化时做出响应。以下是该方法的详细工作流程和作用:
- 添加监听器到本地列表:
- 将参数中的registerCenterListener添加到registerCenterListenerList中,这个列表维护了所有的监听器,这些监听器将对Nacos服务中心的变化做出响应。
- 执行服务订阅逻辑:
- 通过调用doSubscribeAllServices方法来执行实际的服务订阅逻辑。
- 定时任务检查服务变更:
- 创建一个定时执行的线程池scheduledThreadPool,这个线程池负责周期性地调用doSubscribeAllServices方法。
- 使用scheduleWithFixedDelay方法设置定时任务,每隔一定时间(在这里是10秒)就重新执行doSubscribeAllServices方法,以此来检查新的服务是否已经添加或现有服务的状态是否有变更。
- 处理服务订阅更新:
- doSubscribeAllServices方法将检查Nacos服务列表与当前已订阅服务的差异,并订阅任何新的服务。
- 如果发现新服务,则会创建一个新的EventListener,并用它订阅这个服务的变化。一旦服务状态有变化,就会触发事件,然后通过NamingEvent事件传递给所有监听器。
- 事件监听与变更通知:
- 在NacosRegisterListener内部类中定义的onEvent方法会在每个服务变化时被调用。
- 当onEvent方法被触发时,它会从Nacos服务中心查询服务的当前定义和实例信息,并通知所有注册的RegisterCenterListener监听器,这样客户端就可以采取相应的动作,如更新其内部服务列表、重新负载均衡等。
这么多的代码中,也包含了对服务订阅的代码,但是我们先不进行讲解,我们先主要进行对服务注册的代码理解。
如何实现服务订阅
在上面的章节中我们已经顺带的讲解了服务订阅的一个方式。 这里我将会更加具体的分析服务订阅和服务变更的监听的实现思路。 我们先需要明确一个概念就是,对于注册中心的服务,我们不单单需要对他们进行服务拉取,还需要订阅注册中心中的服务变更的事件。也就是当注册中心中出现了服务变更,我们也是需要配置监听器去处理对应的变更事件的。 因此,当我们的网关服务启动之后,我们就需要将我们当前的服务信息注册到注册中心,同时监听订阅注册中心的配置变更事件。 ![[../../photo/Pasted image 20251019193102.png]] 注册流程比较简单,上文已经讲解,这里我们主要分析订阅事件。 我们会调用我自己实现的注册中心实现类来调用订阅接口,并且传入一个监听器,并且实现这个监听器的onChange方法,这个方法的作用就是当注册中心发生变更事件之后,执行的具体代码操作。 这个方法所执行的内容就是重新加载一次服务实例,确保当前服务实例信息是最新的。同时再一次执行服务定义信息修改方法,修改当前发生变更的服务实例信息。
private void doSubscribeAllServices() {
try {
//得到当前服务已经订阅的服务
//这里其实已经在init的时候初始化过 naming service 了,所以这里可以直接拿到当前服务已经订阅的服务
Set<String> subscribeService = namingService.getSubscribeServices().stream()
.map(ServiceInfo::getName).collect(Collectors.toSet());
int pageNo = 1;
int pageSize = 100;
//分页从nacos拿到服务列表
List<String> serviseList = namingService
.getServicesOfServer(pageNo, pageSize, registerCenterConfig.getNacos().getGroup()).getData();
//拿到所有的服务名称后进行遍历
while (CollectionUtils.isNotEmpty(serviseList)) {
log.info("订阅的服务数: {}", serviseList.size());
for (String service : serviseList) {
//判断是否已经订阅了当前服务
if (subscribeService.contains(service)) {
continue ;
}
//nacos事件监听器 订阅当前服务
//这里我们需要自己实现一个nacos的事件订阅类 来具体执行订阅执行时的操作
EventListener eventListener = new NacosRegisterListener();
//当前服务之前不存在 调用监听器方法进行添加处理,进行初始化
eventListener.onEvent(new NamingEvent(service, null));
//为指定的服务和环境注册一个事件监听器
namingService.subscribe(service, registerCenterConfig.getNacos().getGroup(), eventListener);
log.info("订阅 {} {}", service, registerCenterConfig.getNacos().getGroup());
}
//遍历下一页的服务列表
serviseList = namingService.getServicesOfServer(++ pageNo, pageSize, registerCenterConfig.getNacos().getGroup()).getData();
}
} catch (NacosException e) {
throw new RuntimeException(e);
}
}/**
* 实现对nacos事件的监听器 这个事件监听器会在Nacos发生事件变化的时候进行回调
* NamingEvent 是一个事件对象,用于表示与服务命名空间(Naming)相关的事件。
* NamingEvent 的作用是用于监听和处理命名空间中的服务实例(Service Instance)的变化,
* 以便应用程序可以根据这些变化来动态地更新服务实例列表,以保持与注册中心的同步。
*/
public class NacosRegisterListener implements EventListener {
@Override
public void onEvent(Event event) {
//先判断是否是注册中心事件。服务命名
if (event instanceof NamingEvent namingEvent) {
String serviceName = namingEvent.getServiceName();
try {
//获取服务定义信息
Service service = namingMaintainService.queryService(serviceName, registerCenterConfig.getNacos().getGroup());
ServiceDefinition serviceDefinition = JSON.parseObject(service.getMetadata()
.get(META_DATA_KEY), ServiceDefinition.class);
//获取服务实例信息
List<Instance> allInstances = namingService.getAllInstances(service.getName(), registerCenterConfig.getNacos().getGroup());
Set<ServiceInstance> set = new HashSet<>();
/**
* meta-data数据如下
* {
* "version": "1.0.0", * "environment": "production", * "weight": 80, * "region": "us-west", * "labels": "web, primary", * "description": "Main production service" * } */ for (Instance instance : allInstances) {
ServiceInstance serviceInstance = JSON.parseObject(instance.getMetadata().get(META_DATA_KEY), ServiceInstance.class);
set.add(serviceInstance);
}
/**
* 调用每个监听器的onChange方法,传入服务定义信息和服务实例集合。
*/
registerCenterListenerList.forEach(l -> l.onChange(serviceDefinition, set));
} catch (NacosException e) {
throw new RuntimeException(e);
}
} }}上面这段代码就是服务订阅和服务变更监听的相关代码。 其作用就是在发生服务变更,比如服务实例的上线或者下线的时候,根据当前发生变更的服务名称,比如当前上线了一个api-user的服务实例,那么就会触发更新操作,然后获取当前服务实例对应的服务,并且根据当前的服务拉取当前服务所对应的存在的所有服务实例,并且进行保存,那么此时我们就得到了更新后的服务实例信息和服务订阅信息了,因为这里的onInstancesChange方法会将我们的服务定义和服务实例信息放入到我们一开始设定的DynamicConfigManager中,而它就是我们存储服务实例、服务定义信息的一个管理容器。 下面附上一张服务实例信息的元数据信息来帮助你更好的理解元数据到底是什么样子的。 ![[../../photo/Pasted image 20251019193312.png]] 原数据就是服务信息中的一个数据而已 到此,其实我们就已经完成了服务订阅。 其实服务订阅和服务注册的代码都比较通用,只要编写一次之后按照固定的类似的模板去编写代码即可。
整合Nacos---使用配置中心与配置变更事件监听
这里我们依旧按照上面的方式来讲解这一节。 我想你一定至少简单了解或者使用过配置中心,在Nacos中,我们可以将我们的配置编写在配置中心中,然后再服务启动的时候主动的拉取配置中心中的代码并且作用在本地,同时当配置中心中的配置发生变更的时候我们也可以根据服务订阅事件得到配置中心中变更的配置信息。 ![[../../photo/Pasted image 20251019193413.png]] 从上图就可以比较容易的了解配置中心的构成,这里就不在赘述。
这里我依旧是写了一套入门的代码来帮助你了解如何从Nacos的配置中心拉取我们的配置。
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* 测试nacos配置中心
* @author: breathe
* @createTime: 2025-10-04
*/@Slf4j
public class TestNacosConfig {
public static void main(String[] args) {
String serverAddress = "127.0.0.1:8848";
String dataId = "test-data-id";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddress);
properties.put("dataId", dataId);
properties.put("group", group);
try {
ConfigService configService = NacosFactory.createConfigService(properties);
//获取配置
String config = configService.getConfig(dataId, group, 5000);
log.info("获取配置:\n{}", config);
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String s) {
log.info("配置更新:\n{}", s);
}
});
while (true) {
Thread.sleep(5000);
}
} catch (NacosException e) {
log.error("配置获取失败", e);
} catch (InterruptedException e) {
log.info("暂停失败");
}
}}可以发现和进行服务注册一样,使用配置中心进行配置的拉取以及配置变更的订阅都非常容易,我们先从配置的拉取来实现。 还是老方法,为了增加通用性,我们编写一套对应配置中心的接口。
package cyou.breathe.gateway.config.center.api;
import cyou.breathe.gateway.config.config.ConfigCenterConfig;
/**
* 配置中心接口
* @author: breathe
* @createTime: 2025-10-02
*/public interface ConfigCenter {
/**
* 初始化配置中心配置
* @param configCenterConfig 配置中心配置
*/
void init(ConfigCenterConfig configCenterConfig);
/**
* 将规则变更
* 订阅配置中心配置变更
* @param listener 配置变更监听器
*/
void subscribeRulesChange(RulesChangeListener listener);
}之后我们开始分析如何基于Nacos-Client来实现配置拉取和配置变更订阅。
package cyou.breathe.gateway.config.center.nacos;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import cyou.breathe.gateway.config.center.api.ConfigCenter;
import cyou.breathe.gateway.config.center.api.RulesChangeListener;
import cyou.breathe.gateway.config.config.ConfigCenterConfig;
import cyou.breathe.gateway.config.config.centerimpl.NacosConfig;
import cyou.breathe.gateway.config.entity.Rule;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* nacos配置中心
* @author: breathe
* @createTime: 2025-10-02
*/@Slf4j
public class NacosConfigCenter implements ConfigCenter {
/**
* 配置中心配置
*/
private ConfigCenterConfig configCenterConfig;
/**
* Nacos提供的与配置中心进行交互的接口
*/
private ConfigService configService;
/**
* 是否完成初始化
*/
private final AtomicBoolean init = new AtomicBoolean(false);
@Override
public void init(ConfigCenterConfig configCenterConfig) {
if (!configCenterConfig.isEnabled() || !init.compareAndSet(false, true)) {
return ;
}
this.configCenterConfig = configCenterConfig;
try {
configService = NacosFactory.createConfigService(configCenterConfig.getAddress());
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
/**
* 实现订阅方法
* @param listener 配置变更监听器
*/
@Override
public void subscribeRulesChange(RulesChangeListener listener) {
try {
// 获取nacos配置
NacosConfig nacos = configCenterConfig.getNacos();
//初始化通知 DATA_ID是自己定义的 返回值就是一个json
String config = configService.getConfig(nacos.getDataId(), nacos.getGroup(), nacos.getTimeout());
//{"rules":[{}, {}]} 定义的规则:key:rules,value:数组Rule
log.info("config from nacos: \n{}", config);
List<Rule> rules = JSON.parseObject(config).getJSONArray("rules").toJavaList(Rule.class);
//调用监听器 参数就是拿到的rules
listener.onRulesChange(rules);
//监听变化
configService.addListener(nacos.getDataId(), nacos.getGroup(), new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
log.info("config from nacos: {}", configInfo);
List<Rule> rules = JSON.parseObject(configInfo).getJSONArray("rules")
.toJavaList(Rule.class);
listener.onRulesChange(rules);
}
});
} catch (NacosException e) {
throw new RuntimeException(e);
}
}}可以发现代码和上面的入门代码差不多,都是直接调用getConfig方法来获取配置,然后对配置进行解析,这里的配置我放到这段的末尾了,由于我们已经知道了我们配置的格式和信息,所以我们可以自定义一个类来进行转化和解析。 在我们的项目首次启动的时候就会调用subscribeConfigChange这个方法来拉取配置,从而初始化我们的过滤器的规则,同时我们也会配置一个监听器来监听我们的配置中心的配置变更事件。 而具体的监听代码再启动类中给出,如下: ![[../../photo/Pasted image 20251019193606.png]] 这里可以发现有rules,这个rule其实就是我们的路由规则,我的设计是有一个rules对于不同的过滤器链,可以进行一个汇总,到时候我们不需要所有的请求都去buildFilterChain,从而提高效率。
可以发现,当配置发生变更的时候,我们做的事情其实就是拉取此时的最新配置,然后再次将其解析为Rule对象,然后我们会将这个包含了大量Rule的List集合进行处理,将其放入到DynamicConfigManager。 此时我们就基于路径和服务存放了我们的规则,因为我们的规则可能是按照路径生效,也可能按照服务生效。
配置中心配置文件格式
{
"rules" : [
{
"id" : "backend-http-server:1.0.0",
"serviceId" : "backend-http-server",
"prefix" : "/http-server",
"paths" : [
"/ping"
],
"order" : 0,
"protocol" : "http",
"name" : "test"
},
{
"id" : "admin:1.0.0",
"serviceId" : "admin",
"prefix" : "/api/admin",
"paths" : [
"/**"
],
"order" : 0,
"protocol" : "http",
"name" : "test"
},
{
"id" : "user:1.0.0",
"serviceId" : "user",
"prefix" : "/api/user",
"paths" : [
"/ping1"
],
"order" : 0,
"protocol" : "http",
"name" : "test"
}
]
}到此为止,配置中心的配置拉取和配置变更事件的监听我们都已经完成了。 Nacos的部分到此基本结束。
网关路由管控
路由的加载
在上面的章节中,我们已经实现了包含注册中心配置中心、网络通信框架、容器、配置的所有代码的实现。 接下来我们就要开始真正的处理我们的网络请求部分的代码的设计和实现了。 首先我们都明白,网关中有一个特别核心的概念就是,对于不同的路径,会有不同的正则表达式去匹配,从而对这些路径进行不同的处理。 我称这些表达式为:Rule。 也就是在这个模块,我将和你分析如何实现所谓的: 路由。 首先我们先定义Rule类,确定好我们的Rule所需要用到的一些基本信息。 这些配置对应的含义我也已经在代码中进行标注,代码比较简单好理解,就是定义了一些对应的规则会用到的类信息,比如Resilience4j会用到的信息,我也为其定义了一个类。
package cyou.breathe.gateway.config.entity;
import cyou.breathe.gateway.common.enums.CircuitBreakerEnum;
import cyou.breathe.gateway.common.enums.ResilienceEnum;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.*;
import static cyou.breathe.gateway.common.constants.FallbackConstant.DEFAULT_FALLBACK_HANDLER_NAME;
import static cyou.breathe.gateway.common.enums.ResilienceEnum.*;
/**
* 路由定义
* 他就是我们在配置中心的配置设置
* @author: breathe
* @createTime: 2025-10-01
*/@Data
public class Rule implements Comparable<Rule>, Serializable {
@Serial
private static final long serialVersionUID = 113248791L;
/**
* 规则ID,全局唯一
*/
private String id;
/**
* 规则名称
*/
private String name;
/**
* 协议
*/
private String protocol;
/**
* 后端服务ID
*/ private String serviceId;
/**
* 请求前缀
*/
private String prefix;
/**
* 路径集合
*/
private List<String> paths;
/**
* 规则排序,对应场景:一个路径对应多条规则,然后只执行一条规则的情况
*/
private Integer order;
/**
* 系统弹性配置,熔断、降级等
*/
private Set<ResilienceConfig> resilience = new HashSet<>();
/**
* 重试配置
*/
private RetryConfig retryConfig = new RetryConfig();
/**
* 路由需要走的过滤器
*/
private Set<FilterConfig> filterConfigs = new HashSet<>();
/**
* 限流规则
*/
private Set<FlowCtlConfig> flowCtlConfigs = new HashSet<>();
/**
* 熔断降级配置
*/
@Data
public static class ResilienceConfig {
/**
* 弹性处理路径
*/
private String path;
/**
* 是否开启弹性配置
*/
private boolean enabled = true;
/**
* 是否开启重试
*/
private boolean retryEnabled = true;
/**
* 是否开启熔断
*/
private boolean circuitBreakerEnabled = true;
/**
* 是否开启降级
*/
private boolean fallbackEnabled = true;
/**
* 是否开启信号量隔离
*/
private boolean bulkheadEnabled = false;
/**
* 是否开启线程池隔离
*/
private boolean threadPoolBulkheadEnabled = false;
/**
* 弹性处理顺序
*/
private List<ResilienceEnum> order = Arrays.asList(THREADPOOLBULKHEAD, BULKHEAD, RETRY, CIRCUITBREAKER, FALLBACK);
// Retry
/** * 重试次数
*/
private int maxAttempts = 3;
/**
* 重试间隔
*/
private int waitDuration = 500;
// CircuitBreaker
/** * 以百分比配置失败率阈值。当失败率大于等于阈值时,进行熔断,并进行服务降级
*/
private int failureRateThreshold = 50;
/**
* 慢调用比例超过这个则进行熔断,并进行服务降级
*/
private int slowCallRateThreshold = 100;
/**
* 单位ms,超过这个视为慢调用,这个应该需要比httpclient的请求超时时间httpRequestTimeout大,否则不会生效
*/
private int slowCallDurationThreshold = 60000;
/**
* 断路器在半开状态下允许通过的调用次数
*/
private int permittedNumberOfCallsInHalfOpenState = 10;
/**
* 断路器在半开状态下的最长等待时间,超过该配置值的话,断路器会从半开状态恢复为开启状态。配置是0时表示断路器会一直处于半开状态,直到所有允许通过的访问结束
*/
private int maxWaitDurationInHalfOpenState = 0;
/**
* 滑动窗口类型,如果是COUNT_BASED,则是计数,如果是TIME_BASED,则是时间,单位是秒
*/
private CircuitBreakerEnum type = CircuitBreakerEnum.COUNT_BASED;
/**
* 滑动窗口大小
*/
private int slidingWindowSize = 100;
/**
* 统计失败率或慢调用率的最小调用数
*/
private int minimumNumberOfCalls = 100;
/**
* 断路器从开启过渡到半开应等待的时间,单位ms
*/ private int waitDurationInOpenState = 60000;
/**
* 是否开启额外线程监听断路器从开启到半开的状态变化,如果不开启,则需时间到了并且有请求才会到半开状态
*/
private boolean automaticTransitionFromOpenToHalfOpenEnabled = false;
// Fallback
/** * 默认降级策略名
*/
private String fallbackHandlerName = DEFAULT_FALLBACK_HANDLER_NAME;
// Bulkhead
/** * 信号数量
*/
private int maxConcurrentCalls = 1000;
/**
* 最大等待时间
*/
private int maxWaitDuration = 0;
/**
* 是否公平竞争信号量
*/
private boolean fairCallHandlingEnabled = false;
// ThreadPoolBulkhead
/** * 核心线程数
*/
private int coreThreadPoolSize = 5;
/**
* 最大线程数
*/
private int maxThreadPoolSize = 10;
/**
* 队列容量
*/
private int queueCapacity = 100;
}
/**
* 过滤器配置
*/
@Data
public static class FilterConfig {
/**
* 过滤器唯一ID
*/ private String id;
/**
* 过滤器规则描述,{"timeOut":500,"balance":random}
*/ private String config;
@Override
public boolean equals(Object o){
if (this == o) {
return true;
}
if((o== null) || getClass() != o.getClass()){
return false;
}
FilterConfig that =(FilterConfig)o;
return id.equals(that.id);
}
@Override
public int hashCode(){
return Objects.hash(id);
}
}
/**
* 重试配置
*/
@Data
public static class RetryConfig {
//重试次数
private int times;
}
/**
* 流量控制配置
*/
@Data
public static class FlowCtlConfig {
/**
* 限流类型-可能是path,也可能是IP或者服务
*/
private String type;
/**
* 限流对象的值
*/
private String value;
/**
* 限流模式-单机还有分布式
*/
private String model;
/**
* 限流规则,是一个JSON
*/ private String config;
}
/**
* 向 Rule 里面添加过滤器
*/
public boolean addFilterConfig(FilterConfig filterConfig) {
return filterConfigs.add(filterConfig);
}
/**
* 通过一个指定的FilterID获取FilterConfig
*/ public FilterConfig getFilterConfig(String id) {
for (FilterConfig config : filterConfigs) {
if (config.getId().equalsIgnoreCase(id)) {
return config;
}
} return null;
}
/**
* 根据filterID判断当前Rule是否存在
*/
public boolean hashId(String id) {
for (FilterConfig filterConfig : filterConfigs) {
if (filterConfig.getId().equalsIgnoreCase(id)) {
return true;
}
} return false;
}
@Override
public int compareTo(Rule rule) {
int result = Integer.compare(this.order, rule.getOrder());
if (result == 0) {
return getId().compareTo(rule.getId());
}
return result;
}
@Override
public boolean equals(Object rule) {
if (this == rule) {
return true;
}
if (rule == null || getClass() != rule.getClass()) {
return false;
}
Rule that = (Rule) rule;
return id.equals(that.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}之后,每当我们项目启动运行的时候,都会从配置中心拉取对应的配置。 此时,我们对应的配置就已经加载好了,并且当配置中心发生配置变更的时候也会触发配置变更事件从而重新导入加载配置。 到此规则的加载就结束了,我将会在下文提到过滤器的时候通过对规则的配置来使得过滤器有不同的效果。
如何存储规则?
规则的加载比较容易,并且我们也看到了rule的编写方法,只需要定义一个类,这个类提供你的规则使用时所需要用到的属性即可。 那么如何去使用rule呢?rule是如何生效的呢? 为了便于理解,我将从rule获取与存放的地方开始,从0为你讲解规则的存放方式。 ![[../../photo/Pasted image 20251019194056.png]] 可以发现,当我们的项目启动的时候,我们就已经从配置中心拉取了配置,也就是route,此时规则通过JSON的解析,我们就得到了规则集合 List<Rule>。 而这些集合中的规则将会被进一步的处理。 在上文中我提到了,rule分为基于路径的rule和基于服务的rule。并且每一个规则有它的唯一编号防止重复。 其中id就是规则的唯一编号,用于表示我们的规则,name表示规则的名称,paths表示规则将会对那些路径进行生效,prefix表示路径的前缀,protocol表示规则对什么协议生效,serviceId表示当前规则适用于什么服务(名称),filterConfigs就是对我们的过滤器的具体配置信息了,过滤器的部分是我们项目的核心部分,我也会在接下来的章节中具体的进行讲解。
{
"routes": [
{
"id": "user-service-route",
"serviceName": "user-service",
"uri": "/api/user/**"
}
]
}当我们从配置中心拉取配置并解析完毕之后,我们就开始了rule的存放。 我们rule的存放时根据DynamicConfigManager管理的 主要是这几个map
/**
* 服务的定义集合:uniqueId代表服务的唯一标识
* key: uniqueId
* value: 服务定义
*/
private ConcurrentHashMap<String, ServiceDefinition> serviceDefinitionMap = new ConcurrentHashMap<>();
/**
* 服务的实例集合:uniqueId与一对服务实例对应
* key: uniqueId
* value: 服务实例集合
*/
private ConcurrentHashMap<String, Set<ServiceInstance>> serviceInstanceMap = new ConcurrentHashMap<>();
/**
* 规则集合
* key: ruleId
* value: rule */private ConcurrentHashMap<String, Rule> ruleMap = new ConcurrentHashMap<>();
/**
* 路径以及规则集合
* key: path
* value: rule */private ConcurrentHashMap<String, Rule> pathRuleMap = new ConcurrentHashMap<>();
/**
* 每个服务名对应规则集合
* key: serviceName
* value: 规则集合
*/
private ConcurrentHashMap<String, List<Rule>> serviceRuleMap = new ConcurrentHashMap<>();
/**
* 路由规则变化监听器
* key: serviceId
* value: 监听器集合
*/
private final ConcurrentHashMap<String, List<RouteListener>> routeListenerMap = new ConcurrentHashMap<>();这里如果单独只是用请求的路径,那么可能会出现路径重复的问题,因此我自定义了一个规则,按照如下方式来设定我们的key。
String key = rule.getId + "." + path;到此为止我们就已经在项目启动之后将我们的各种不同的rule进行了保存。以方便我们在后续需要使用的时候取出进行使用。 那么有了这些前置知识,我们就可以完成对网关请求上下文的封装了。 (netty服务端,网关,Http客户端通过这个RequestHelper来进行转换)
package cyou.breathe.gateway.core.helper;
import cn.hutool.core.text.AntPathMatcher;
import cyou.breathe.gateway.common.constants.GatewayConstant;
import cyou.breathe.gateway.common.exception.ResponseException;
import cyou.breathe.gateway.config.entity.Rule;
import cyou.breathe.gateway.config.entity.ServiceDefinition;
import cyou.breathe.gateway.config.invoker.ServiceInvoker;
import cyou.breathe.gateway.config.invoker.impl.HttpServiceInvoker;
import cyou.breathe.gateway.config.manager.DynamicConfigManager;
import cyou.breathe.gateway.core.context.GatewayContext;
import cyou.breathe.gateway.core.request.GatewayRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.apache.commons.lang3.StringUtils;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static cyou.breathe.gateway.common.constants.HttpConstant.HTTP_FORWARD_SEPARATOR;
import static cyou.breathe.gateway.common.enums.ResponseCode.PATH_NO_MATCHED;
/**
* Netty服务端、网关、Http客户端之间的请求转换
* @author: breathe
* @createTime: 2025-10-04
*/public class RequestHelper {
public static GatewayContext doContext(FullHttpRequest request, ChannelHandlerContext ctx) {
// 构建请求对象GatewayRequest
GatewayRequest gateWayRequest = doRequest(request, ctx);
ServiceDefinition serviceDefinition = DynamicConfigManager.getInstance().getServiceDefinition(gateWayRequest.getUniqueId());
// 根据请求对象获取服务定义对应的方法调用,然后获取对应的规则
ServiceInvoker serviceInvoker = new HttpServiceInvoker();
serviceInvoker.setInvokerPath(gateWayRequest.getPath());
serviceInvoker.setTimeout(500);
Rule rule = getRule(gateWayRequest,serviceDefinition.getServiceId());
// 判断路径是否匹配
String prefix = rule.getPrefix();
List<String> paths = rule.getPaths();
AntPathMatcher antPathMatcher = new AntPathMatcher();
String uri = gateWayRequest.getPath();
boolean isMatch = false;
for (String path : paths) {
if (antPathMatcher.match(prefix + path, uri)) {
isMatch = true;
break;
}
}
if (!isMatch) {
throw new ResponseException(PATH_NO_MATCHED);
}
return new GatewayContext(
serviceDefinition.getProtocol(),
ctx,
HttpUtil.isKeepAlive(request),
gateWayRequest,
rule,
0);
}
/**
* 构建Request请求对象
*/
private static GatewayRequest doRequest(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx) {
HttpHeaders headers = fullHttpRequest.headers();
String uniqueId = headers.get(GatewayConstant.UNIQUE_ID) == null ? getUniqueIdFromUri(fullHttpRequest.uri()) : headers.get(GatewayConstant.UNIQUE_ID);
String host = DynamicConfigManager.getInstance()
.getServiceInstanceByUniqueId(uniqueId, false)
.stream().findFirst()
.get()
.getServiceInstanceId();
HttpMethod method = fullHttpRequest.method();
String uri = fullHttpRequest.uri();
String clientIp = getClientIp(ctx, fullHttpRequest);
String contentType = HttpUtil.getMimeType(fullHttpRequest) == null ? null : HttpUtil.getMimeType(fullHttpRequest).toString();
Charset charset = HttpUtil.getCharset(fullHttpRequest, StandardCharsets.UTF_8);
return new GatewayRequest(uniqueId,
charset,
clientIp,
host,
uri,
method,
contentType,
headers,
fullHttpRequest);
}
/**
* 获取客户端ip
*/ private static String getClientIp(ChannelHandlerContext ctx, FullHttpRequest request) {
String xForwardedValue = request.headers().get(HTTP_FORWARD_SEPARATOR);
String clientIp = null;
if(StringUtils.isNotEmpty(xForwardedValue)) {
List<String> values = Arrays.asList(xForwardedValue.split(", "));
if(!values.isEmpty() && StringUtils.isNotBlank(values.get(0))) {
clientIp = values.get(0);
}
} if(clientIp == null) {
InetSocketAddress inetSocketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
clientIp = inetSocketAddress.getAddress().getHostAddress();
}
return clientIp;
}
/**
* 获取Rule对象,完成了规则和路径的匹配
*/
private static Rule getRule(GatewayRequest gateWayRequest,String serviceId){
String key = serviceId + "." + gateWayRequest.getPath();
//配置中心拿到对应的Rule
Rule rule = DynamicConfigManager.getInstance().getRuleByPath(key);
if (rule != null){
return rule;
}
//没找到的话,从配置中心拿到对应的Rule
return DynamicConfigManager.getInstance().getRuleByServiceId(serviceId)
.stream().filter(r -> gateWayRequest.getPath().startsWith(r.getPrefix()))
.findAny().orElseThrow(()-> new ResponseException(PATH_NO_MATCHED));
}
private static String getUniqueIdFromUri(String uri) {
// 解析URI中的路径部分
String path = new QueryStringDecoder(uri).path();
System.out.println();
// 获取所有服务定义
DynamicConfigManager configManager = DynamicConfigManager.getInstance();
// 遍历所有服务定义,尝试匹配路径
for (Map.Entry<String, ServiceDefinition> entry : configManager.getServiceDefinitionMap().entrySet()) {
ServiceDefinition serviceDefinition = entry.getValue();
// 检查服务是否启用
if (!serviceDefinition.isEnable()) {
continue;
}
// 如果服务定义中有invokerMap,则检查路径是否匹配
if (serviceDefinition.getInvokerMap() != null) {
for (String invokerPath : serviceDefinition.getInvokerMap().keySet()) {
// 简单的前缀匹配,可以根据需要实现更复杂的匹配逻辑
if (path.startsWith(invokerPath)) {
// 返回匹配服务的uniqueId
return entry.getKey();
}
} } }
// 没有找到匹配的服务
return null;
}
}过滤器链的设计
过滤器的设计
在设计过滤器链之前,我们先设计出来过滤器的结构。 根据之前的文章,其实我们可以知道,过滤器对请求的处理方式其实就是将我们的网关请求上下文GatewayContext放入到我们的某一个具体的过滤器中然后执行过滤器方法即可。 同时,参考SpringCloudGateway我们知道,过滤器链中的过滤器是有序执行的,因此我们还需要使得当前过滤器提供一个方法来返回其顺序。 那么我们可以设计出过滤器结构如下:
![[../../photo/Pasted image 20251019194410.png]]
过滤器链工厂
上文中已经提到了,对于每一个请求,我们的处理方式是通过过滤器链的方式来对进行处理,也就是每一个请求都将按照route走完一套过滤器链流程。 我们的过滤器流程在这里介绍一下吧(个人认为更像拦截器):
- 在请求发送之前会走preFilter,进行一次前置校验链,相当于是对请求做一次修饰和过滤
- 在响应返回的时候,我们还会经过这些过滤器链,对响应进行一次修饰和过滤,可以把它看成拦截器。 那么接下来我们来分析一下如何设计过滤器链条。 首先从请求接收处开始考虑,我们的每一个请求都需要通过NettyCoreProcessor的处理,这个类中我们将先将我们的请求信息封装为我们的网关上下文信息。在这里就会走请求前的过滤,也就是preFilter
GatewayContext gatewayContext = ContextHelper.buildGatewayContext(wrapper.getRequest(), wrapper.getCtx());
FilterChainFactory.buildFilterChain(gatewayContext);
gatewayContext.doFilter();之后我们就可以基于网关上下文信息构建过滤器链,然后开始执行过滤器链中的过滤器方法。 这里之所以要构建过滤器链是因为,我们的每一个不同的请求,每一个携带不同参数的请求,都有可能触发不同的过滤器链规则,因此,我们需要使用过滤器链工厂,为我们的请求信息专门的生成过滤器链去执行请求。当然,这样子的性能会有影响,所以我会在后续对这个地方进行优化。 既然上文提到我们需要对不同的请求构建过滤器链,那么我们就按照工厂方法的方式去实现,提供过滤器链生产方法以及根据过滤器ID获得过滤器的方法即可。
package cyou.breathe.gateway.core.filter;
import cyou.breathe.gateway.core.context.GatewayContext;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 过滤器链
* @author: breathe
* @createTime: 2025-10-11
*/@Slf4j
public class GatewayFilterChain {
/**
* 一系列过滤器集合
*/
private final List<Filter> filters = new ArrayList<>();
/**
* 添加过滤器
*/
public GatewayFilterChain addFilter(Filter filter){
filters.add(filter);
return this;
}
/**
* 添加过滤器集合
*/
public GatewayFilterChain addFilterList(List<Filter> filter) {
filters.addAll(filter);
return this;
}
/**
* 执行过滤器
*/
public GatewayContext doFilter(GatewayContext ctx) throws Exception {
// 若为空直接返回
if (filters.isEmpty()) {
return ctx;
}
// 遍历执行过滤器
try {
for (Filter fl : filters) {
//执行过滤器
fl.doFilter(ctx);
//判断是否执行完成
if (ctx.isTerminated()) {
break ;
}
} } catch (Exception e) {
log.error("执行过滤器发生异常,异常信息:{}",e.getMessage());
throw e;
}
return ctx;
}
}而过滤器链的实现可以按照如下的步骤去实现:
- 加载所有过滤器
- 遍历规则,并判断当前路径是否启用当前过滤器
- 将适用的过滤器添加到集合中
- 路由过滤器是最后生效的过滤器,上述步骤完成后最后放入路由过滤器
- 按照过滤器的生效顺序对过滤器进行排序
- 过滤器链创建成功
所以,按照上面的步骤,并结合我们之前的代码,不难得出如下代码:
package cyou.breathe.gateway.core.filter;
import cyou.breathe.gateway.core.context.GatewayContext;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 过滤器链
* @author: breathe
* @createTime: 2025-10-11
*/@Slf4j
public class GatewayFilterChain {
/**
* 一系列过滤器集合
*/
private final List<Filter> filters = new ArrayList<>();
/**
* 添加过滤器
*/
public GatewayFilterChain addFilter(Filter filter){
filters.add(filter);
return this;
}
/**
* 添加过滤器集合
*/
public GatewayFilterChain addFilterList(List<Filter> filter) {
filters.addAll(filter);
return this;
}
/**
* 执行过滤器
*/
public GatewayContext doFilter(GatewayContext ctx) throws Exception {
// 若为空直接返回
if (filters.isEmpty()) {
return ctx;
}
// 遍历执行过滤器
try {
for (Filter fl : filters) {
//执行过滤器
fl.doFilter(ctx);
//判断是否执行完成
if (ctx.isTerminated()) {
break ;
}
} } catch (Exception e) {
log.error("执行过滤器发生异常,异常信息:{}",e.getMessage());
throw e;
}
return ctx;
}
}之后,过滤器链工厂方法也已经实现了,最后就是轮到了对过滤器链中过滤器的执行,由于我们知道其实过滤器链就是一个过滤器集合,因此我们可以容易的的得出只需要将网关上下文放入到过滤器中进行执行,并且使用迭代器遍历过滤器让他们去执行过滤方法即可。
package cyou.breathe.gateway.core.filter;
import cyou.breathe.gateway.core.context.GatewayContext;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 过滤器链
* @author: breathe
* @createTime: 2025-10-11
*/@Slf4j
public class GatewayFilterChain {
/**
* 一系列过滤器集合
*/
private final List<Filter> filters = new ArrayList<>();
/**
* 添加过滤器
*/
public GatewayFilterChain addFilter(Filter filter){
filters.add(filter);
return this;
}
/**
* 添加过滤器集合
*/
public GatewayFilterChain addFilterList(List<Filter> filter) {
filters.addAll(filter);
return this;
}
/**
* 执行过滤器
*/
public GatewayContext doFilter(GatewayContext ctx) throws Exception {
// 若为空直接返回
if (filters.isEmpty()) {
return ctx;
}
// 遍历执行过滤器
try {
for (Filter fl : filters) {
//执行过滤器
fl.doFilter(ctx);
//判断是否执行完成
if (ctx.isTerminated()) {
break ;
}
} } catch (Exception e) {
log.error("执行过滤器发生异常,异常信息:{}",e.getMessage());
throw e;
}
return ctx;
}
}到此为止,过滤器链条和过滤器的设计我们都已经完成了。 接下来的章节其实就是基本思考如何对于某一些具体的需求来实现一个具体的过滤器了,比如负载均衡过滤器,路由过滤器,鉴权过滤器等等过滤器。
不同过滤器的具体实现
这一章节我们来思考如何基于不同的需求实现不同的过滤器。 参考SpringCloudGateway的实现,不同过滤器一般提供如下的这些功能。
- 决定路由:路由过滤器负责确定请求应该被路由到哪个微服务。基于请求的 URL、头信息或其他参数,路由过滤器可以决定合适的服务实例进行处理。
- 修改请求和响应:在路由请求到目标服务之前,路由过滤器可以修改请求。这包括添加、删除或修改请求头,改变请求的目的地,或者增加查询参数等。
- 认证和授权:虽然通常由专门的过滤器处理,但路由过滤器也可以参与检查用户的认证和授权信息,以决定是否允许请求被路由到下游服务。
- 负载均衡:在一个微服务架构中,一个服务可能有多个实例。路由过滤器可以实现负载均衡,确保请求均匀地分布在不同的服务实例上。
- 容错和重试机制:路由过滤器可以实现容错机制,比如当目标服务不可用时,自动重试其他实例或提供备用响应。
- 日志和监控:在路由请求的过程中,路由过滤器可以记录关于请求的重要信息,这对于监控和分析系统性能至关重要。
- 处理响应:在从目标服务接收到响应后,路由过滤器可以对响应进行修改或处理,例如修改响应头、更改响应状态码或者对返回的数据进行转换。
- 限流和熔断:在高流量场景中,路由过滤器可以实现限流策略来防止系统过载,或者在后端服务故障时提供熔断机制。
而我的网关项目也将提供这些最基本的过滤器功能。 我的网关中包含:鉴权、限流、熔断、负载均衡、Mock、监控、路由七大过滤器。 其中,最最基础的过滤器其实就是我们的路由过滤器和负载均衡过滤器了。 这里我选择先讲解负载均衡过滤器,之后讲解路由过滤器。 并且接下来对每一个过滤器的实现的讲解,我都会按照设计思路,代码实现两步走的方式来讲解。
负载均衡过滤器
设计思路
我们知道,负载均衡其实就是按照一定的策略,从多个的后端服务实例中选取一个实例,然后将请求发送到当前实例上,比较常用的负载均衡策略有:轮询、随机、权重。 这里我选择对轮询和随机两个比较简单的来进行实现。 对于轮询,对于我们的网关项目,我们直接使用AtomicInteger进行累加计数即可。 然后我们从Nacos获取服务实例的数量之后,使用AtomicInteger中的数据和服务实例的数量进行取余运算或者进行与运算,我们就能得到需要负载均衡到的机器的路由了。 而随机则更加简单,我们得到服务实例数量之后,将随机数分为设定为小于等于这个数量即可。 思考清楚了两个负载均衡策略如何实现,我们得思考如何选择具体使用那个负载均衡策略,按照我们的规则,我们可以在规则中提供一个字段,这个字段用于选择具体使用的策略。 ![[../../photo/Pasted image 20251019194553.png]] 例如,按照上面的代码,我们解析完毕Rule之后,得到filterConfigs属性,并获取到其中的id和config配置,而这里的config配置就代表了负载均衡策略具体选择的是哪一个策略。 而具体的规则我们也在请求进入到网关并且被NettyCoreProcessor处理的时候进行过封装了。
因此,我们可以直接从网关上下文中获取到当前请求对应的规则,然后从配置中心中拿出当前服务对应的规则。 查看代码比较容易理解,其实就是从网关上下文中获取到规则配置之后,拿出对应的config字段中的load_balance字段,这个字段就是我们选择的负载均衡的具体策略。 默认我们使用随机的负载均衡策略。 之后,我们对JSON进行解析并且得到具体选择的负载均衡策略即可。
代码实现
package cyou.breathe.gateway.core.filter.loadbalance;
import com.alibaba.fastjson.JSON;
import cyou.breathe.gateway.common.exception.NotFoundException;
import cyou.breathe.gateway.config.entity.Rule;
import cyou.breathe.gateway.config.entity.ServiceInstance;
import cyou.breathe.gateway.core.context.GatewayContext;
import cyou.breathe.gateway.core.filter.Filter;
import cyou.breathe.gateway.core.filter.FilterAspect;
import cyou.breathe.gateway.core.request.GatewayRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static cyou.breathe.gateway.common.constants.FilterConstant.*;
import static cyou.breathe.gateway.common.enums.ResponseCode.SERVICE_INSTANCE_NOT_FOUND;
/**
* 负载均衡过滤器
* @author: breathe
* @createTime: 2025-10-12
*/@Slf4j
@FilterAspect(id=LOAD_BALANCE_FILTER_ID,
name = LOAD_BALANCE_FILTER_NAME,
order = LOAD_BALANCE_FILTER_ORDER)
public class LoadBalanceFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx){
//服务id
String serviceId = ctx.getUniqueId();
//对应实现的类
IGatewayLoadBalanceRule gatewayLoadBalanceRule = getLoadBalanceRule(ctx);
ServiceInstance serviceInstance = gatewayLoadBalanceRule.choose(serviceId,ctx.isGray());
System.out.println("IP为"+serviceInstance.getIp()+",端口号:"+serviceInstance.getPort());
GatewayRequest request = ctx.getRequest();
if (request != null) {
String host = serviceInstance.getIp()+":"+serviceInstance.getPort();
request.setModifyHost(host);
} else {
log.warn("没有可获取的实例 :{}",serviceId);
throw new NotFoundException(SERVICE_INSTANCE_NOT_FOUND);
}
}
/**
* 根据配置获取负载均衡器
*/
public IGatewayLoadBalanceRule getLoadBalanceRule(GatewayContext ctx) {
IGatewayLoadBalanceRule loadBalanceRule = null;
Rule configRule = ctx.getRule();
if (configRule != null) {
Set<Rule.FilterConfig> filterConfigs = configRule.getFilterConfigs();
Iterator iterator = filterConfigs.iterator();
Rule.FilterConfig filterConfig;
while (iterator.hasNext()) {
filterConfig = (Rule.FilterConfig) iterator.next();
if (filterConfig == null) {
continue;
}
String filterId = filterConfig.getId();
// 如果是负载均衡过滤器
if (filterId.equals(LOAD_BALANCE_FILTER_ID)) {
String config = filterConfig.getConfig();
String strategy = LOAD_BALANCE_STRATEGY_RANDOM;
if (StringUtils.isNotEmpty(config)) {
Map<String, String> mapTypeMap = JSON.parseObject(config, Map.class);
strategy = mapTypeMap.getOrDefault(LOAD_BALANCE_KEY, strategy);
}
loadBalanceRule = switch (strategy) {
case LOAD_BALANCE_STRATEGY_RANDOM ->
RandomLoadBalanceRule.getInstance(configRule.getServiceId());
case LOAD_BALANCE_STRATEGY_ROUND_ROBIN ->
RoundRobinLoadBalanceRule.getInstance(configRule.getServiceId());
default -> {
log.warn("该服务没有负载均衡策略: {}", strategy);
yield RandomLoadBalanceRule.getInstance(configRule.getServiceId());
}
};
}
} } return loadBalanceRule;
}
}随机负载均衡过滤器实现: 这里由于我们对于不同的服务会有不同的负载均衡的策略,但是这些策略在大多数时候是不会改变的,因此我们可以选择将其进行缓存,方法为使用一个HashMap即可。于是就有了如下代码:
package cyou.breathe.gateway.core.filter.loadbalance;
import cyou.breathe.gateway.common.exception.NotFoundException;
import cyou.breathe.gateway.config.entity.ServiceInstance;
import cyou.breathe.gateway.config.manager.DynamicConfigManager;
import cyou.breathe.gateway.core.context.GatewayContext;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import static cyou.breathe.gateway.common.enums.ResponseCode.SERVICE_INSTANCE_NOT_FOUND;
/**
* 随机负载均衡
* @author: breathe
* @createTime: 2025-10-12
*/@Slf4j
public class RandomLoadBalanceRule implements IGatewayLoadBalanceRule {
private final String serviceId;
/**
* 服务id对应的实例列表
*/
private Set<ServiceInstance> serviceInstanceSet;
public RandomLoadBalanceRule(String serviceId) {
this.serviceId = serviceId;
}
private static final ConcurrentHashMap<String,RandomLoadBalanceRule>
SERVICE_MAP = new ConcurrentHashMap<>();
public static RandomLoadBalanceRule getInstance(String serviceId){
return SERVICE_MAP.computeIfAbsent(serviceId, RandomLoadBalanceRule::new);
}
@Override
public ServiceInstance choose(GatewayContext ctx) {
String serviceId = ctx.getUniqueId();
return choose(serviceId, ctx.isGray());
}
@Override
public ServiceInstance choose(String serviceId,boolean gray) {
Set<ServiceInstance> serviceInstanceSet =
DynamicConfigManager.getInstance().getServiceInstanceByUniqueId(serviceId,gray);
if(serviceInstanceSet.isEmpty()){
log.warn("没有可获取的实例: {}",serviceId);
throw new NotFoundException(SERVICE_INSTANCE_NOT_FOUND);
}
List<ServiceInstance> instances = new ArrayList<ServiceInstance>(serviceInstanceSet);
int index = ThreadLocalRandom.current().nextInt(instances.size());
return (ServiceInstance)instances.get(index);
}
}轮询负载均衡过滤器实现: 同理,轮询的负载均衡过滤器的实现方式只不过变为了需要维护一个AtomicInteger用于计数。
package cyou.breathe.gateway.core.filter.loadbalance;
import cyou.breathe.gateway.common.exception.NotFoundException;
import cyou.breathe.gateway.config.entity.ServiceInstance;
import cyou.breathe.gateway.config.manager.DynamicConfigManager;
import cyou.breathe.gateway.core.context.GatewayContext;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static cyou.breathe.gateway.common.enums.ResponseCode.SERVICE_INSTANCE_NOT_FOUND;
/**
* 轮训负载均衡过滤器
* @author: breathe
* @createTime: 2025-10-12
*/@Slf4j
public class RoundRobinLoadBalanceRule implements IGatewayLoadBalanceRule{
/**
* 线程安全
*/
private final AtomicInteger position = new AtomicInteger(1);
private final String serviceId;
public RoundRobinLoadBalanceRule(String serviceId) {
this.serviceId = serviceId;
}
private static final ConcurrentHashMap<String,RoundRobinLoadBalanceRule>
SERVICE_MAP = new ConcurrentHashMap<>();
public static RoundRobinLoadBalanceRule getInstance(String serviceId) {
RoundRobinLoadBalanceRule loadBalanceRule = SERVICE_MAP.get(serviceId);
if (loadBalanceRule == null) {
loadBalanceRule = new RoundRobinLoadBalanceRule(serviceId);
SERVICE_MAP.put(serviceId,loadBalanceRule);
}
return loadBalanceRule;
}
@Override
public ServiceInstance choose(GatewayContext ctx) {
return choose(ctx.getUniqueId(), ctx.isGray());
}
@Override
public ServiceInstance choose(String serviceId, boolean gray) {
Set<ServiceInstance> serviceInstanceSet =
DynamicConfigManager.getInstance().getServiceInstanceByUniqueId(serviceId,gray);
if(serviceInstanceSet.isEmpty()){
log.warn("没有可获取的服务实例: {}",serviceId);
throw new NotFoundException(SERVICE_INSTANCE_NOT_FOUND);
}
List<ServiceInstance> instances = new ArrayList<>(serviceInstanceSet);
if (instances.isEmpty()) {
log.warn("没有可获取的服务实例: {}",serviceId);
return null;
} else {
int pos = Math.abs(this.position.incrementAndGet());
return instances.get(pos%instances.size());
}
}}至此,我们的负载均衡过滤器就已经设计完毕了,如果有兴趣可以自己实现基于权重的负载均衡过滤器。
路由过滤器
在讲解具体的路由过滤器的实现之前,先来了解一下请求重试,因为我们的路由过滤器如果请求失败,一般会选择按照一定的次数进行重试。
请求重试
请求重试是指在请求失败之后再次尝试请求,一般情况下重试可以减少请求因为服务GC卡顿、网络丢包、网络阻塞等短暂问题而导致的失败;然而重试会增加请求总数量,不合理的重试策略甚至可能在服务端不稳定时,导致重试流量风暴,从而压垮服务端导致故障。 请求失败一般可以按照层级划分为连接失败和请求失败;而请求一般可分为幂等和非幂等请求。 连接失败:由于TCP握手失败,实际业务请求并未发送至服务端,所以对此类错误是可以安全的重试的,配合超时配置将链接超时设置在毫秒级别,可以有效的避免偶发网络拥塞、网络丢包等网络故障导致的报错,提升整体稳定性。 请求失败:由于网络连接已经完成,实际业务请求可能已经发送至服务端,服务端的业务逻辑可能已经执行过了;比如服务端超时,而实际服务端业务逻辑会继续执行完成。因此对于幂等请求相对安全,但是对于非幂等的请求,重试可能会有较大风险。
在短视频APP例子中,比如获取账户信息的请求就是幂等请求,不会有服务端数据的修改,重试操作是比较安全的;但是对于添加评论的请求,如果请求超时进行重试,就可能导致评论服务最终收到多个添加评论的请求,最终添加多个重复的评论,显然这是不正确的,会最终导致数据异常。
因此重试配置一般可归于以下几类
- 连接重试: 因为连接重试风险低,收益高,一般情况下默认开启。
- 超时重试: 需要判断业务接口是否幂等,如非幂等风险是否可控,来决定是否启用;提供重试退避策略:重试等待固定时长或逐次提升等待时长。
- Backup Request: 为减少服务的延迟波动。在设置时间内未返回,再次发送请求;例如使用P99作为阈值,来降低长尾问题。
同时为了防止大规模重试导致请求量总量成倍上升,最终压垮服务,重试一般需提供熔断错误率阈值,当请求错误率超过阈值时停止重试。
设计思路
有了负载均衡过滤器的基础,设计路由过滤器就会简单很多了。 路由过滤器其实要做的就是将我们在经过之前过滤器处理好的请求信息发送出去,去请求具体的后端服务实例。 因此我们要做的其实就是在过滤器方法中调用具体的执行路由的逻辑。 在这个部分中,我们的主要逻辑其实就是使用AsyncHttpClient发送异步请求,我们在之前已经成功的封装好了我们的请求信息,也就是这里的Request,之后我们我们使用AsyncHttpClient来执行这个异步请求,这个异步请求将会返回一个CompletableFuture对象,而我们知道CompletableFuture非常适合于异步任务的场景,这里我们基于CompletableFuture来实现当请求处理完毕之后的收尾工作。 这里我将花费一定的笔墨来介绍一下这里两个Completable调用的方法的区别。 主要区别在于whenComplete和whenCompleteAsync方法的使用。让我逐一分析这两种方法的不同之处:
- whenComplete方法:
- whenComplete是一个非异步的完成方法。
- 当CompletableFuture的执行完成或者发生异常时,它提供了一个回调。
- 这个回调将在CompletableFuture执行的相同线程中执行。这意味着,如果CompletableFuture的操作是阻塞的,那么回调也会在同一个阻塞的线程中执行。
- 在这段代码中,如果whenComplete为true,则在future完成时使用whenComplete方法。这意味着complete方法将在future所在的线程中被调用。
- whenCompleteAsync方法:
- whenCompleteAsync是异步的完成方法。
- 它也提供了一个在CompletableFuture执行完成或者发生异常时执行的回调。
- 与whenComplete不同,这个回调将在不同的线程中异步执行。通常情况下,它将在默认的ForkJoinPool中的某个线程上执行,除非提供了自定义的Executor。
- 在代码中,如果whenComplete为false,则使用whenCompleteAsync。这意味着complete方法将在不同的线程中异步执行。
- 特别注意哦,由于ForkJoinPool中的线程是共用的,ParallelStream中的线程也是用的ForkJoinPool,因此我推荐你手动设定这个线程池的大小,否则会出现一些异常哦。
好的,那我们接下来就可以看执行完毕请求之后,我们如何对请求的结果进行处理了。也就是我们的complete方法。 代码比较简单直接看注释就可以明白其中的意思,这里的代码涉及到了重试机制,重试的次数也是由配置中心的配置给出的,当我们的请求失败之后我们可以获取到,然后进行对应次数的重试。 并在请求的最后对请求信息进行日志记录。
代码实现
package cyou.breathe.gateway.core.filter.router;
import cyou.breathe.gateway.common.enums.ResponseCode;
import cyou.breathe.gateway.common.exception.ConnectException;
import cyou.breathe.gateway.common.exception.ResponseException;
import cyou.breathe.gateway.config.entity.Rule;
import cyou.breathe.gateway.config.loader.ConfigLoader;
import cyou.breathe.gateway.core.context.GatewayContext;
import cyou.breathe.gateway.core.filter.Filter;
import cyou.breathe.gateway.core.filter.FilterAspect;
import cyou.breathe.gateway.core.helper.AsyncHttpHelper;
import cyou.breathe.gateway.core.helper.ResponseHelper;
import cyou.breathe.gateway.core.resilience.Resilience;
import cyou.breathe.gateway.core.response.GatewayResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import static cyou.breathe.gateway.common.constants.FilterConstant.*;
/**
* 路由过滤器(熔断降级也在这里实现)
* @author: breathe
* @createTime: 2025-10-11
*/@Slf4j
@FilterAspect(id=ROUTER_FILTER_ID,
name = ROUTER_FILTER_NAME,
order = ROUTER_FILTER_ORDER)
public class RouterFilter implements Filter {
@Override
public void doFilter(GatewayContext gatewayContext) throws Exception {
Optional<Rule.ResilienceConfig> resilienceConfig = getResilienceConfig(gatewayContext);
//没有熔断配置就不走
if(resilienceConfig.isPresent()){
routeWithResilience(gatewayContext,resilienceConfig);
}else{
route(gatewayContext,resilienceConfig);
}
}
private static Optional<Rule.ResilienceConfig> getResilienceConfig(GatewayContext gatewayContext){
return gatewayContext.getRule().getResilience().stream()
.filter(c-> StringUtils.equals(c.getPath(),
gatewayContext.getRequest().getPath()))
.findFirst();
}
private CompletableFuture<Response> route(GatewayContext gatewayContext,
Optional<Rule.ResilienceConfig> resilienceConfig){
Request request = gatewayContext.getRequest().build();
CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
boolean whenComplete = ConfigLoader.getConfig().getNetty().isWhenComplete();
/**
* 将值写回客户端
*/
if (whenComplete) {
future.whenComplete((response, throwable) -> {
complete(request, response, throwable, gatewayContext,resilienceConfig);
});
} else {
future.whenCompleteAsync((response, throwable) -> {
complete(request, response, throwable, gatewayContext,resilienceConfig);
});
}
return future;
}
private void routeWithResilience(GatewayContext gatewayContext, Optional<Rule.ResilienceConfig> resilienceConfig) {
Resilience.getInstance().executeResilienceRequest(gatewayContext,resilienceConfig, this::route);
}
private void complete(Request request,
Response response,
Throwable throwable,
GatewayContext gatewayContext,Optional<Rule.ResilienceConfig> resilienceConfig) {
gatewayContext.releaseRequest();
Rule rule = gatewayContext.getRule();
try {
if (Objects.nonNull(throwable)) {
String url = request.getUrl();
if (throwable instanceof TimeoutException) {
log.warn("complete time out {}", url);
gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.REQUEST_TIMEOUT));
} else {
gatewayContext.setThrowable(new ConnectException(throwable,
gatewayContext.getUniqueId(),
url, ResponseCode.HTTP_RESPONSE_ERROR));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.HTTP_RESPONSE_ERROR));
}
} else {
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response));
}
} catch (Throwable t) {
gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.INTERNAL_ERROR));
log.error("complete error", t);
} finally {
gatewayContext.written();
ResponseHelper.writeResponse(gatewayContext);
}
}}限流过滤器
服务限流是在高并发系统中,用于保护服务方的一种手段。通过限流功能,可以限制QPS的最大值、Connection的最大值,以避免被突发流量击溃,从而保障系统的高可用性。 业务场景:
- 突发流量保护
- 防止调用方高频误刷
在短视频APP的例子中,假如遇到突发热点新闻「xxx事件」,短视频流量突增,视频信息服务请求量成倍增加,导致CPU被打崩,服务崩溃,在突发流量消退前都将无法正常服务,所有用户都将受到影响;如果可以限制请求视频信息服务的请求总量,对无法支持的流量丢弃,使得部分用户能够正常使用APP,将损失降低,同时对服务进行扩容,进一步降低损失。
针对基础的使用场景,我们介绍以下2类限流:
- 单实例限流
- 分布式限流 在后续高阶章节我们还会介绍智能化的限流模式,通过程序自动识别是否限流。
单实例限流
单实例限流是为防止本实例的服务被打挂而进行的限流设置,单实例限流是为每个实例分配限流额度,确保单个实例接收到的QPS和Connection在设定阈值内,从而保障实例的稳定性。 一般对稳定性要求较高的服务都会先进行单实例压测,确定本服务单实例的Max QPS,然后利用本功能进行单实例QPS && Connection 的限制。 单实例限流对每个实例的阈值是一样的,所以当被调用服务的实例权重差别较大的时候,权重较高的实例因为流量更大,因此更容易触发限流阈值。
分布式限流
分布式限流则是通过SDK+中心存储令牌模式实现精准的集群限流模式。每个实例都会运行SDK限流逻辑(令牌桶限流算法),限流令牌将向中心存储(Redis / MySQL)申请令牌,从而实现精准的集群流量限制。 分布式限流优点:
- 能够精准的控制集群整体的MAX Qps,即使限流值小于实例数。
- 实例的权重并不影响触发限流的概率,所有实例共享令牌桶。 分布式限流缺点:
- 由于引入了SDK和中心存储,带来更大的运维成本。
- 请求前需要从中心存储申请令牌,带来一定性能开销。 常见的限流算法有令牌桶算法和漏桶算法,这里两种算法我们都可以考虑使用一下。
如何选择限流模式
这两种限流模式如何选择?
- 单实例限流:一般适用于保护服务自身不被打垮,例如计算型服务、缓存型服务、代理型服务等
- 分布式限流:一般适用于对集群整体限流精度有要求的服务,主要保护其依赖的第三方服务(数据库、缓存、中台服务等)。 限流是服务端将一视同仁的将无法承载的请求丢弃,在这种极端情况下,我们如何提高服务端的”性价比“,将有限的资源发挥最大的价值呢?嘿嘿,那这里卖个关子,留到服务降级的时候说。 首先,对于限流这一块,我们需要在配置中心配置限流规则。 例如限流的路径或者限流的服务。同时,根据你的服务是分布式服务还是单体服务,也需要考虑使用不同的方式来存储信息。 比如如果是分布式服务,就需要使用Redis,而如果是单体,那么考虑使用本地缓存即可,比如Guava或者Caffeine。 老样子,我们先编写一个接口,这个接口用于获取对应的限流过滤器。
![[../../photo/Pasted image 20251019195516.png]]
按照上面的思路,我们将限流过滤器分为基于Redis和基于Guava两种方式实现的,一种为了分布式项目服务,一种为了单体项目服务。 代码比较好理解,不做过多的赘述。这里主要实现点在于实现基于Redis和基于Guava的计数器,比较简单,网络上教程比较多,篇幅有限,我就不贴出这一部分的具体代码了。
package cyou.breathe.gateway.core.filter.flowctl;
import com.alibaba.fastjson.JSON;
import cyou.breathe.gateway.config.entity.Rule;
import cyou.breathe.gateway.core.redis.JedisUtil;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static cyou.breathe.gateway.common.constants.FilterConstant.*;
/**
* 根据路基进行路由
* @author: breathe
* @createTime: 2025-10-12
*/public class FlowCtlByPathRule implements IGatewayFlowCtlRule {
@Getter
@Setter private String serviceId;
private final String path;
private final RedisCountLimiter redisCountLimiter;
private static final String LIMIT_MESSAGE ="您的请求过于频繁,请稍后重试";
public FlowCtlByPathRule(String serviceId, String path, RedisCountLimiter redisCountLimiter) {
this.serviceId = serviceId;
this.path = path;
this.redisCountLimiter = redisCountLimiter;
}
private static final ConcurrentHashMap<String,FlowCtlByPathRule> SERVICE_PATH_MAP = new ConcurrentHashMap<>();
public static FlowCtlByPathRule getInstance(String serviceId, String path){
String key = serviceId + "." + path;
FlowCtlByPathRule flowCtlByPathRule = SERVICE_PATH_MAP.get(key);
if(flowCtlByPathRule == null){
flowCtlByPathRule = new FlowCtlByPathRule(serviceId,path,new RedisCountLimiter(new JedisUtil()));
SERVICE_PATH_MAP.put(key,flowCtlByPathRule);
}
return flowCtlByPathRule;
}
/**
* 根据路径执行流控
*/
@Override
public void doFlowCtlFilter(Rule.FlowCtlConfig flowCtlConfig, String serviceId) {
if(flowCtlConfig == null || StringUtils.isEmpty(serviceId) || StringUtils.isEmpty(flowCtlConfig.getConfig())){
return;
}
Map<String,Integer> configMap = JSON.parseObject(flowCtlConfig.getConfig(),Map.class);
if(!configMap.containsKey(FLOW_CTL_LIMIT_DURATION) || !configMap.containsKey(FLOW_CTL_LIMIT_PERMITS)){
return;
}
double duration = configMap.get(FLOW_CTL_LIMIT_DURATION);
double permits = configMap.get(FLOW_CTL_LIMIT_PERMITS);
boolean flag = true;
String key = serviceId + "." + path;
if(FLOW_CTL_MODEL_DISTRIBUTED.equalsIgnoreCase(flowCtlConfig.getModel())){
flag = redisCountLimiter.doFlowCtl(key,(int)permits,(int)duration);
}else {
GuavaCountLimiter guavaCountLimiter = GuavaCountLimiter.getInstance(serviceId,flowCtlConfig);
if(guavaCountLimiter == null){
throw new RuntimeException("获取单机限流工具类为空");
}
double count = Math.ceil(permits/duration);
flag = guavaCountLimiter.acquire((int)count);
}
if(!flag){
throw new RuntimeException(LIMIT_MESSAGE);
}
}}之后,我们的限流方法实现了之后,我们就可以将我们的请求通过这个限流方法去处理。
package cyou.breathe.gateway.core.filter.flowctl;
import cyou.breathe.gateway.config.entity.Rule;
import cyou.breathe.gateway.core.context.GatewayContext;
import cyou.breathe.gateway.core.filter.Filter;
import cyou.breathe.gateway.core.filter.FilterAspect;
import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.Set;
import static cyou.breathe.gateway.common.constants.FilterConstant.*;
/**
* 限流流控过滤器
* @author: breathe
* @createTime: 2025-10-12
*/@Slf4j
@FilterAspect(id=FLOW_CTL_FILTER_ID,
name = FLOW_CTL_FILTER_NAME,
order = FLOW_CTL_FILTER_ORDER)
public class FlowCtlFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx) throws Exception {
Rule rule = ctx.getRule();
if(rule != null){
Set<Rule.FlowCtlConfig> flowCtlConfigs = rule.getFlowCtlConfigs();
Iterator iterator = flowCtlConfigs.iterator();
Rule.FlowCtlConfig flowCtlConfig;
while (iterator.hasNext()){
IGatewayFlowCtlRule flowCtlRule = null;
flowCtlConfig = (Rule.FlowCtlConfig)iterator.next();
if(flowCtlConfig == null){
continue;
}
String path = ctx.getRequest().getPath();
if(flowCtlConfig.getType().equalsIgnoreCase(FLOW_CTL_TYPE_PATH)
&& path.equals(flowCtlConfig.getValue())){
flowCtlRule = FlowCtlByPathRule.getInstance(rule.getServiceId(),path);
}
if(flowCtlRule != null){
flowCtlRule.doFlowCtlFilter(flowCtlConfig,rule.getServiceId());
}
} } }}熔断与服务降级过滤器
服务降级是在服务所发出「实际请求需求」大于下游「稳定/可提供QPS」阈值时所使用的一种「服务维稳手段」,保障在部分极端情况下整体系统可以通过牺牲部分能力方式换来一定程度的可用性,而不是超出阈值后导致系统雪崩。 服务降级常见有两种方式,业务根据自身需求进行对应选择:
- 弃车保帅:按一定丢弃规则,仅丢弃部分请求,保障部分高优/核心 请求可以获得稳定服务。
- 贫富相均:对于所有请求一视同仁,按照相同比例丢弃。
对于基础系统/核心业务/关键服务,其组件可用性有着极高要求,如果因其承载资源需求过高而整体完全不可用,会导致大面积服务调用链直接断裂,并使故障进一步扩散,引起「系统雪崩」,其造成的巨大损失是我们不能接受的。
在短视频APP的例子中,如下图,假如因为突发事件流量,导致账户服务的流量增加触发了限流;但是对于短视频的场景下,视频播放功能价值显然高于评论功能的价值,在有限的资源情况下,账户服务如果主动将评论服务的流量进行降级,将资源腾挪给视频信息服务,舍弃评论功能,保护视频播放能力显然能获得更高性价比。
![[../../photo/Pasted image 20251019195631.png]] 服务降级究其根本,即是“断臂求生”。对于实际业务场景而言,降级方式的评估即是对「付出成本」和「实际收获」的评估,可以从以下三个维度去抽象细化:
- 尽可能少丢弃—— “每一个请求都有价值”
- 尽可能丢弃价值较低的请求—— “请求与请求的价值是不同的”
- 尽可能丢弃性价比低的请求—— “吃了两份资源却只能产出一份价值的请求” 接下来我们开始实现熔断与服务降级。熔断与服务降级, 熔断降级限流我们在项目中是根据resilience4j来实现的,先引入依赖。具体是通过实现一个熔断降级工厂来对请求逻辑进行装饰,从而起到熔断降级限流的目的,请你自行看resilience中的代码实现即可。
灰度过滤器
在上面的基础知识中我想你已经简单的对灰度发布有了了解,接下来我们开始复习如何实现一个灰度发布。 我们首先分析一下实现灰度发布的一些前置知识。 首先,灰度发布意味着我们的少量的流量走的并不是我们常规的服务器,而是我们迭代后的一些还在测试的服务器,因此,我们可以对服务器集群中选择部分的机器,将他们设定为灰度发布的机器。也就是给这些服务实例一些标识,让他们成为灰度发布流量的处理者。 因此,我们首先需要在注册服务的时候注册少量的灰度发布实例。 这件比较容易实现,我们直接在Nacos注册中心中编辑服务实例的信息即可。 当然也可以在项目启动的时候手动的修改gray的属性为true。 之后,我们需要将灰度发布的过滤器添加到我们的过滤器链中。 ![[../../photo/Pasted image 20251019195948.png]] 然后来看看灰度发布过滤器的实现,比较简单,就是获取请求头中的gray字段判断当前全球是否走的是灰度发布的流量。
package cyou.breathe.gateway.core.filter.gray;
import cyou.breathe.gateway.core.context.GatewayContext;
import cyou.breathe.gateway.core.filter.Filter;
import cyou.breathe.gateway.core.filter.FilterAspect;
import lombok.extern.slf4j.Slf4j;
import static cyou.breathe.gateway.common.constants.FilterConstant.*;
/**
* @author: breathe
* @createTime: 2025-10-11
*/@Slf4j
@FilterAspect(id=GRAY_FILTER_ID,
name = GRAY_FILTER_NAME,
order = GRAY_FILTER_ORDER)
public class GrayFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx) throws Exception {
//测试灰度功能待时候使用
String gray = ctx.getRequest().getHeaders().get("gray");
if ("true".equals(gray)) {
ctx.setGray(true);
}
//选取一部分用户作为灰度用户
//获取客户端ip
String clientIp = ctx.getRequest().getClientIp();
int res = clientIp.hashCode() & (1024 - 1);
if (res == 1) {
//1024分之一的概率
ctx.setGray(true);
}
}}然后我们发起一个常规的请求。这个请求由于并非灰度流量,那么他就会按照负载均衡的策略选择一个后端服务去处理。 而如果我们的请求是一个灰度流量,那么就必须寻找后端服务中为灰度发布的实例。 而如果当前流量并非灰度流量,我们就直接返回所有的服务实例即可,然后让其选择其中一个。 当然你也可以进行修改,让其过滤出对应的实例,比如灰度发布请求只能请求灰度发布的实例,非灰度发布的请求只能请求非灰度发布的实例,这里的实现比较简单,不多赘述。 至此,我们的灰度发布也已经实现了。
Mock过滤器
Mock过滤器的作用就是,当我们的前端需要一些数据进行测试的时候,但是此时我们的代码还没有开发完毕,还没有上线,那么此时我们可以先提供一些Mock数据给前端帮助前端进行测试,所以既有了Mock过滤器,这个过滤器可以返回一些特定的数据给前端,用于模拟后端服务已经完成的情况。 具体的实现思路可以为前端发送请求之后,如果当前的请求路径被我们配置为Mock,那么我们就可以让Mock过滤器去处理它,并且由于Mock过滤器只是返回数据,因此我们可以提高Mock过滤器的优先级,并且使用Mock过滤器后就不需要真正的再去请求后端服务了,我们直接返回即可。 首先开启Mock过滤器的配置。
"filterConfigs": [
{
"id": "load_balance_filter",
"config": {
"load_balance": "RoundRobin"
}
},
{
"id": "auth_filter"
},
{
"id": "mock_filter",
"config": {
"GET /http-server/ping": "mock"
}
}
]当我们发送一个Mock路径的请求的时候,就会执行Mock过滤器的逻辑,然后按照我们的逻辑返回我们的Mock数据,这里的Mock数据我写的比较随意,按理来说应该按照后端实体类的方式来进行处理。
项目优化
基于缓存的优化
在之前我们就已经埋了一个伏笔,也就是我们的网关每次遇到请求的时候都需要重新构建一个过滤器链来进行对请求的处理,从而影响了请求的处理速度,那么接下来,我们就使用缓存的方式来缓存我们的过滤器链,从而优化项目的执行速度。 我们的解决方法其实就是在项目中引入Caffeine,并且在对于相同的规则的时候,我们复用之前创建的过滤器链即可。
![[../../photo/Pasted image 20251019200423.png]] 这里其实就是将我们的过滤器链进行了一套缓存,之后只要是通用的规则都将使用这一套过滤器链,从而对项目的性能进行了优化。
基于JVM调优和GC的优化
Netty线程介绍
在Netty中有两个比较重要的线程概念,一个是BOSS线程,一个是Woker线程。
- Boss线程组: Boss线程组通常负责处理接受客户端连接的工作,即处理ServerSocketChannel的连接事件。 Boss线程会监听并接受客户端的连接请求,然后将连接注册到Worker线程池中的某个Worker线程上。 通常情况下,建议将Boss线程数配置为1,因为在大多数情况下,一个Boss线程足以处理大量的连接请求。
- Worker线程组: Worker线程组负责处理已经被Boss线程接受的连接,处理IO事件、执行业务逻辑等。 Worker线程池中的每个线程都有一个独立的EventLoop,它负责处理多个Channel的事件。 通过配置多个Worker线程,可以实现并发处理多个连接,提高系统的吞吐量。
- 配置建议:
- Boss线程数: 通常情况下,一个Boss线程足以处理大量的连接请求,因此可以将其配置为1。 可以通过ServerBootstrap的group方法来配置Boss线程组。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
// ...其他配置
.bind(port);- Worker线程数: Worker线程的数量应该根据服务器的性能和处理能力来配置。 如果服务器是多核的,通常可以配置多个Worker线程,以充分利用多核处理器的优势。 一般建议配置的线程数为核心数的两倍到四倍之间,具体取决于应用的性能需求和硬件条件。
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1 Boss线程
EventLoopGroup workerGroup = new NioEventLoopGroup(4); // 4 Worker线程线程数的调优: 可以根据应用的性能测试结果来动态调整Boss和Worker线程的数量,以达到最佳的性能表现。 通过监控系统资源利用率、CPU使用率等指标,可以调整线程池大小以平衡吞吐量和资源消耗。
Netty实战配置
上面我们已经简单的讲解了一下Netty中涉及到的两个线程,这里我们开始进行编码,然后使用压测工具JMeter进行测试,我们就能明显的看到修改Netty的线程数量在一定程度上可以提升我们网关的性能。 按照上面的Netty的Worker线程的数量进行配置,然后我们进行1000 * 1000数量的压测。 吞吐量大概为2w。 之后,我们修改我们的Worker线程数量为CPU的核心数量。 吞吐量大概为3w。所以合理的配置Worker线程数量可以带来一定的性能提升。 (自行通过jmeter测试,可以联系我要jmeter的压测视频)
JVM参数与ZGC
JVM提供了许多的参数,我们可以通过合理的配置这些参数来做到性能调优的作用,这里我就不列举出具体的JVM参数了。 其次,我们知道影响Java项目性能的一个点在于垃圾回收器的执行,也就是我们的GC。 比较著名的GC有:G1、ZGC、CMS。 我的项目使用的是JDK19搭建的,所以我的项目支持使用ZGC。 并且ZGC有如下优点: ![[../../photo/Pasted image 20251019200557.png]]
JVM与ZGC调优
上面讲解了一些理论,接下来我们就基于实际的配置开始对我们的项目进行性能测试。 我们首先进行JVM参数上的调优。 首先设定堆大小。
-Xms10g -Xmx10g然后,我们知道项目启动的时候,一般都没有进行预热,比如我们的JIT等优化都还没有生效,所以我们先发送少量请求进行预热,防止影响平均值。 之后我们就可以开始配置正常的压测数据,依旧是1000 * 1000。 这里我先使用的是G1垃圾回收器。
-Xms10g -Xmx10g -Xlog:gc*=info:file=logs/gc.log:time,tags -XX:+UseG1GC之后,我们换用ZGC
-Xms10g -Xmx10g -Xlog:gc*=info:file=logs/gc.log:time,tags -XX:+UseZGC通过压测发现,ZGC比G1快了10倍,太夸张了
基于无界无锁缓冲区Disruptor的优化
什么是缓冲区队列
JDK中提供的一些队列,他们之间包含了有锁的实现,也包含了无锁的实现,这意味着在并发情况下,如果是不支持线程安全的队列,则会出现线程不安全、线程覆盖、数据丢失等线程安全问题。 所以我们需要使用线程安全的队列来保证线程安全,如下是JDK中提供的线程安全的队列。 但是他们之间有一些问题,比如有锁队列性能稍差但是更安全,他是有界的,无锁队列性能好但是无界,无界意味着容易出现OOM等问题。所以我们肯定首先排除使用无界队列。 当然并不是说无界队列就没有用,只是在某些场景下我们需要剔除他们,不使用他们。
| Queue Type | Data Structure | Key Technique | Has Lock | Bounded | Lock Type |
|---|---|---|---|---|---|
| ArrayBlockingQueue | Array | Reentrant | Yes | Yes | Lock |
| LinkedBlockingQueue | Linked List | Reentrant | Yes | Yes | Lock |
| LinkedTransferQueue | Linked List | CAS | No | No | CAS |
| ConcurrentLinkedQueue | Linked List | CAS | No | No | CAS |
我们在开发中使用的比较多的就是ArrayBlockingQueue了,底层基于数组,使用的是ReentrantLock来提供线程安全的有锁访问。 当然,由于有锁,所以性能稍差一些,并且底层数组也意味着其容量受到了一定的限制。 所以,我们希望有更好的性能,并且希望队列无界的同时保证不出现OOM,那么是否存在这样的队列? 是的,这篇文章我就将基于Disruptor队列来优化项目性能。提供网关缓冲区。首先明确一点,之所以要抛弃ArrayBlockingQueue的原因是因为使用ReentrantLock的性能小于CAS,而使用CAS的性能小于无锁性能。 所以我们至少应该将使用Lock锁的方式替换为CAS,毕竟如果获取锁失败,是需要进行等待的,那么此时线程就只能阻塞,同时还得保证底层不直接使用数组,因为使用数组意味着有界。并且扩容数组也是一部分的性能开销。
Disruptor高性能的原因
Disruptor在如下几点上进行了优化,使得其提供了一个高性能的队列。
- 无阻塞算法: Disruptor内部使用一系列的无锁(lock-free)算法,例如CAS(Compare and Swap)等,来实现高效的并发操作。这些算法的使用减少了竞争条件,提高了系统的并发性。
- 解决伪共享问题 在 Disruptor 的设计中,关键的优化是通过缓存行填充(Cache Line Padding)来避免伪共享。伪共享通常发生在多个线程同时修改共享缓存行内的不同变量,导致不必要的缓存同步。通过在缓存行内填充一些无关的变量,可以确保不同变量不共享同一个缓存行,从而减少了伪共享的影响。 具体来说,Disruptor 在设计 Ring Buffer(环形缓冲区)时,通过在每个槽(slot)之间填充 padding 变量,使得相邻的槽不会共享同一缓存行。这样,当一个线程修改一个槽时,不会影响到其他槽,减少了缓存同步的开销。 在 Disruptor 中,对于 Java 对象的数组,其大小通常是 2 的幂次方。这样,每个槽之间的距离正好是缓存行的大小。这种设计有效地解决了伪共享的问题,提高了 Disruptor 的性能。 需要注意的是,这种缓存行填充的做法可能在某些情况下会增加内存的消耗,但相对于性能提升而言,这是一个可以接受的权衡。
- 环形缓冲区(Ring Buffer): Disruptor内部使用环形缓冲区作为数据存储结构,这种数据结构的设计使得读写操作可以在不涉及锁的情况下高效进行。生产者和消费者可以在缓冲区上独立进行读写操作,减少了线程之间的竞争。
Disruptor实战
这里额外补充一个知识点,就是Disruptor的等待策略。 Disruptor 中的等待策略(Wait Strategy)是用于在消费者等待可用事件时决定其行为的一种机制。不同的等待策略在不同的场景中有不同的性能表现和行为特点。以下是 Disruptor 中常见的几种等待策略及其区别:
- BlockingWaitStrategy(阻塞等待策略): BlockingWaitStrategy 是最基本的等待策略,它使用 Object.wait() 和 Object.notifyAll() 方法来进行线程间的通信。 当消费者等待事件时,会释放 CPU 资源,降低了消费者线程的活跃度,适合于线程数较少的场景。
- SleepingWaitStrategy(自旋等待策略): SleepingWaitStrategy 在消费者等待事件时使用自旋的方式,避免了阻塞,但在一定时间内如果没有获取到事件,会进入睡眠状态。适用于对低延迟要求较高的场景,但可能会占用一定的 CPU 资源。
- YieldingWaitStrategy(礼让等待策略): YieldingWaitStrategy 在消费者等待事件时会尝试进行自旋,如果自旋一定次数后仍未获取到事件,则会进行线程礼让(Yield)。 适用于对低延迟要求高的场景,但可能占用较多的 CPU 资源。
- BusySpinWaitStrategy(忙等待策略): BusySpinWaitStrategy 是一种非常简单的等待策略,它会一直自旋等待事件的到来,不进行任何的线程礼让或睡眠。 适用于对延迟极为敏感的场景,但可能会占用大量的 CPU 资源。
- PhasedBackoffWaitStrategy(分阶段退避等待策略): PhasedBackoffWaitStrategy 是一种自适应的等待策略,会根据不同的等待阶段选择不同的等待方式,例如自旋、睡眠等。 可以在不同的场景中平衡延迟和 CPU 资源占用。
接下来我们开始实现使用Disruptor的一些必要条件: 我们先自定义一个事件监听器。
package cyou.breathe.gateway.core.disruptor;
/**
* 事件监听器
* 监听器接口
* @author: breathe
* @createTime: 2025-10-13
*/public interface EventListener<E> {
/**
* 监听事件
*/
void onEvent(E event);
/**
* 监听异常
*/
void onException(Throwable ex,long sequence,E event);
}并且实现一个并发多线程使用的队列接口
package cyou.breathe.gateway.core.disruptor;
/**
* 多生产者多消费者处理接口
* @author: breathe
* @createTime: 2025-10-13
*/public interface ParallelQueue<E> {
/**
* 添加元素
*/
void add(E event);
/**
* 添加多个元素
*/
void add(E... event);
/**
* 添加多个元素, 如果队列已满,则返回false
*/ boolean tryAdd(E event);
/**
* 添加多个元素, 如果队列已满,则返回false
*/ boolean tryAdd(E... event);
/**
* 启动
*/
void start();
/**
* 销毁
*/
void shutdown();
/**
* 判断是否已经销毁
*/
boolean isShutdown();
}之后,我们基于Disruptor的要求,实现核心代码
package cyou.breathe.gateway.core.disruptor;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 并行队列处理
* @author: breathe
* @createTime: 2025-10-13
*/@Slf4j
public class ParallelQueueHandler<E> implements ParallelQueue<E> {
private RingBuffer<Holder> ringBuffer;
private final EventListener<E> eventListener;
private final WorkerPool<Holder> workerPool;
private final ExecutorService executorService;
private final EventTranslatorOneArg<Holder,E> eventTranslator;
public ParallelQueueHandler(Builder<E> builder) {
this.executorService = Executors.newFixedThreadPool(builder.threads,
new ThreadFactoryBuilder().setNameFormat("ParallelQueueHandler"+builder.namePrefix+"-pool-%d").build());
this.eventListener = builder.listener;
this.eventTranslator = new HolderEventTranslator();
//创建RingBuffer
RingBuffer<Holder> ringBuffer = RingBuffer.create(builder.producerType,
new HolderEventFactory(),
builder.bufferSize,
builder.waitStrategy);
//通过RingBuffer 创建屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建多个消费者组
WorkHandler<Holder>[] workHandlers = new WorkHandler[builder.threads];
for (int i = 0; i < workHandlers.length; i ++) {
workHandlers[i] = new HolderWorkHandler();
}
//创建多消费者线程池
WorkerPool<Holder> workerPool = new WorkerPool<>(ringBuffer,
sequenceBarrier,
new HolderExceptionHandler(),
workHandlers);
//设置多消费者的Sequence序号,主要用于统计消费进度,
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
this.workerPool = workerPool;
}
@Override
public void add(E event) {
final RingBuffer<Holder> holderRing = ringBuffer;
if(holderRing == null){
process(this.eventListener,new IllegalStateException("批量队列处理器已经关闭了"),event);
}
try {
ringBuffer.publishEvent(this.eventTranslator,event);
}catch (NullPointerException e){
process(this.eventListener,new IllegalStateException("批量队列处理器已经关闭了"),event);
}
}
@Override
public void add(E... events) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
process(this.eventListener,new IllegalStateException("批量队列处理器已经关闭了"),events);
}
try {
ringBuffer.publishEvents(this.eventTranslator,events);
} catch (NullPointerException e) {
process(this.eventListener,new IllegalStateException("批量队列处理器已经关闭了"),events);
}
}
@Override
public boolean tryAdd(E event) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
return false;
}
try {
return ringBuffer.tryPublishEvent(this.eventTranslator,event);
} catch (NullPointerException e) {
return false;
}
}
@Override
public boolean tryAdd(E... events) {
final RingBuffer<Holder> holderRing = ringBuffer;
if (holderRing == null) {
return false;
}
try {
return ringBuffer.tryPublishEvents(this.eventTranslator,events);
}catch (NullPointerException e){
return false;
}
}
@Override
public void start() {
this.ringBuffer = workerPool.start(executorService);
}
@Override
public void shutdown() {
RingBuffer<Holder> holder = ringBuffer;
ringBuffer = null;
if(holder ==null){
return;
}
if(workerPool != null){
workerPool.drainAndHalt();
}
if(executorService != null){
executorService.shutdown();
}
}
@Override
public boolean isShutdown() {
return ringBuffer == null;
}
private static <E> void process(EventListener<E> listener,Throwable e,E event){
listener.onException(e,-1,event);
}
private static <E> void process(EventListener<E> listener,Throwable e,E... events){
for(E event: events){
process(listener,e,event);
}
}
public static class Builder<E>{
private ProducerType producerType = ProducerType.MULTI;
private int bufferSize = 1024 * 16;
private int threads =1;
private String namePrefix = "";
private WaitStrategy waitStrategy = new BlockingWaitStrategy();
private EventListener<E> listener;
public Builder<E> setProducerType(ProducerType producerType) {
Preconditions.checkNotNull(producerType);
this.producerType = producerType;
return this;
}
public Builder<E> setBufferSize(int bufferSize) {
Preconditions.checkArgument(Integer.bitCount(bufferSize) ==1);
this.bufferSize = bufferSize;
return this;
}
public Builder<E> setThreads(int threads) {
Preconditions.checkArgument(threads>0);
this.threads = threads;
return this;
}
public Builder<E> setNamePrefix(String namePrefix) {
Preconditions.checkNotNull(namePrefix);
this.namePrefix = namePrefix;
return this;
}
public Builder<E> setWaitStrategy(WaitStrategy waitStrategy) {
Preconditions.checkNotNull(waitStrategy);
this.waitStrategy = waitStrategy;
return this;
}
public Builder<E> setListener(EventListener<E> listener) {
Preconditions.checkNotNull(listener);
this.listener = listener;
return this;
}
public ParallelQueueHandler<E> build(){
return new ParallelQueueHandler<>(this);
}
}
public class Holder{
private E event;
public void setValue(E event) {
this.event = event;
}
@Override
public String toString() {
return "Holder{" +
"event=" + event +
'}';
}
}
private class HolderExceptionHandler implements ExceptionHandler<Holder>{
@Override
public void handleEventException(Throwable throwable, long l, Holder event) {
Holder holder = (Holder)event;
try {
eventListener.onException(throwable,l,holder.event);
} catch (Exception e){
log.error("异常处理失败: {}", e.getMessage());
} finally {
holder.setValue(null);
}
}
@Override
public void handleOnStartException(Throwable throwable) {
throw new UnsupportedOperationException(throwable);
}
@Override
public void handleOnShutdownException(Throwable throwable) {
throw new UnsupportedOperationException(throwable);
}
}
private class HolderWorkHandler implements WorkHandler<Holder> {
@Override
public void onEvent(Holder holder) throws Exception {
eventListener.onEvent(holder.event);
holder.setValue(null);
}
}
private class HolderEventFactory implements EventFactory<Holder> {
@Override
public Holder newInstance() {
return new Holder();
}
}
private class HolderEventTranslator implements EventTranslatorOneArg<Holder,E>{
@Override
public void translateTo(Holder holder, long l, E e) {
holder.setValue(e);
}
}}这一套代码中,我们基于对Disruptor的了解提供了一些使用Disruptor中必须用到的一些配置,比如我们的RingBuffer。 之后,我们对原先的NettyCoreProcessor进行修改。 添加一个新的Netty处理器,并且整合Disruptor。
package cyou.breathe.gateway.core.netty.processor;
import com.lmax.disruptor.dsl.ProducerType;
import cyou.breathe.gateway.common.enums.ResponseCode;
import cyou.breathe.gateway.config.config.Config;
import cyou.breathe.gateway.core.context.HttpRequestWrapper;
import cyou.breathe.gateway.core.disruptor.EventListener;
import cyou.breathe.gateway.core.disruptor.ParallelQueueHandler;
import cyou.breathe.gateway.core.helper.ResponseHelper;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* disruptor 线程核心处理
* DisruptorNettyCoreProcessor 使用 Disruptor 提高性能的 Netty 处理器。
* 这个处理器是一个缓存层,通过 Disruptor 来异步处理 HTTP 请求,减轻 Netty 核心处理器的负担。
* @author: breathe
* @createTime: 2025-10-13
*/@Slf4j
public class DisruptorNettyCoreProcessor implements NettyProcessor {
/**
* 线程前缀
*/
private static final String THREAD_NAME_PREFIX = "gateway-queue-";
@Getter
@Setter private Config config;
/**
* Disruptor 只是缓存依然需要使用到 Netty 核心处理器
*/
private final NettyCoreProcessor nettyCoreProcessor;
/**
* 处理类
*/
private final ParallelQueueHandler<HttpRequestWrapper> parallelQueueHandler;
/**
* 构造方法,初始化 DisruptorNettyCoreProcessor。
*
* @param config 配置信息对象。
* @param nettyCoreProcessor Netty 核心处理器。
*/
public DisruptorNettyCoreProcessor(Config config, NettyCoreProcessor nettyCoreProcessor) {
this.config = config;
this.nettyCoreProcessor = nettyCoreProcessor;
// 使用 Disruptor 创建并配置处理队列。
ParallelQueueHandler.Builder<HttpRequestWrapper> builder = new ParallelQueueHandler.Builder<HttpRequestWrapper>()
.setBufferSize(config.getNetty().getBufferSize())
.setThreads(config.getNetty().getProcessThread())
.setProducerType(ProducerType.MULTI)
.setNamePrefix(THREAD_NAME_PREFIX)
.setWaitStrategy(config.getNetty().getWaitStrategy());
// 监听事件处理类
BatchEventListenerProcessor batchEventListenerProcessor = new BatchEventListenerProcessor();
builder.setListener(batchEventListenerProcessor);
this.parallelQueueHandler = builder.build();
}
/**
* 处理 HTTP 请求,将请求添加到 Disruptor 处理队列中。
* @param wrapper HttpRequestWrapper 包装类。
*/
@Override
public void process(HttpRequestWrapper wrapper) {
this.parallelQueueHandler.add(wrapper);
}
/**
* 监听处理类,处理从 Disruptor 处理队列中取出的事件。
*/
public class BatchEventListenerProcessor implements EventListener<HttpRequestWrapper> {
@Override
public void onEvent(HttpRequestWrapper event) {
// 使用 Netty 核心处理器处理事件。
nettyCoreProcessor.process(event);
}
@Override
public void onException(Throwable ex, long sequence, HttpRequestWrapper event) {
HttpRequest request = event.getRequest();
ChannelHandlerContext ctx = event.getCtx();
try {
log.error("BatchEventListenerProcessor onException 请求写回失败,request:{}, errMsg:{} ", request, ex.getMessage(), ex);
// 构建响应对象
FullHttpResponse fullHttpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
if (!HttpUtil.isKeepAlive(request)) {
ctx.writeAndFlush(fullHttpResponse).addListener(ChannelFutureListener.CLOSE);
} else {
fullHttpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(fullHttpResponse);
}
} catch (Exception e) {
log.error("BatchEventListenerProcessor onException 请求写回失败,request:{}, errMsg:{} ", request, e.getMessage(), e);
}
} }
/**
* 启动 DisruptorNettyCoreProcessor,启动处理队列。
*/
@Override
public void start() {
parallelQueueHandler.start();
}
/**
* 关闭 DisruptorNettyCoreProcessor,关闭处理队列。
*/
@Override
public void shutdown() {
parallelQueueHandler.shutdown();
}
}原始的 NettyCoreProcessor 直接处理每个 HTTP 请求,而 DisruptorNettyCoreProcessor 使用了 Disruptor 框架,将 HTTP 请求异步地添加到一个处理队列中,然后由 BatchEventListenerProcessor 来处理这个队列中的事件。 Disruptor 是一个高性能的异步事件处理框架,它采用了无锁的设计,通过利用 RingBuffer 的结构,实现了高效的事件发布和消费。在这里,使用 Disruptor 的好处是可以提高并发处理能力,减轻了 Netty 核心处理器的负担。因为网络请求通常是 I/O 密集型的操作,通过异步处理可以提高系统的吞吐量。 同时,我们要在创建Container的时候使用我们的新NettyCoreProcessor,代码变更如下: ![[../../photo/Pasted image 20251019201026.png]] 这个将网关的吞吐又提升了4000 到此为止,我们将我们的网关整合Disruptor缓冲区也已经完毕了,至此,网关的所有重要功能都已经开发完毕。