Java技术栈
本文最后更新于:2025年4月29日 下午
SSE 实现消息推送
SSE(Server-Sent Events)
当涉及到部分请求,后端处理时间较长,使用常规 Http 请求,页面等待时间太长,对用户不友好,故考虑使用长链接进行消息推送,
可选方案有WebSocket、SSE。WebSocket 可实现双工通信,SSE 仅支持服务端向客户端推送消息,根据实际使用场景,SSE 即可满足,故选用 SSE。
在SpringBoot
中,先编写 SSE 服务SseEmitterServer
@Slf4j
@Service
public class SseEmitterServer {
private final static Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();
/**
* 创建新连接
*/
public synchronized SseEmitter createSession() throws IOException {
// 设置过期时间为0 表示永不过期
SseEmitter sseEmitter = new SseEmitter(0L);
String id = UUID.randomUUID().toString().replaceAll("-", "");
if (!SSE_CACHE.containsKey(id)) {
SSE_CACHE.put(id, sseEmitter);
log.info("客户端:[{}]新建成功,当前客户端总数为:[{}]", id, SSE_CACHE.size());
}
return sseEmitter;
}
public void closeSession(String clientId){
if (SSE_CACHE.containsKey(clientId)){
SSE_CACHE.get(clientId).complete();
SSE_CACHE.remove(clientId);
log.info("客户端:[{}]关闭成功,当前剩余客户端总数为:[{}]", clientId, SSE_CACHE.size());
}
}
/**
* 定时任务
*/
@Scheduled(fixedDelay = 3, initialDelay = 1)
public void job() {
if (SSE_CACHE.size() > 0){
String msg = UUID.randomUUID().toString();
System.out.println("定时任务发送消息:" + msg);
for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
SseEmitter sseEmitter = SSE_CACHE.get(entry.getKey());
try {
sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(msg));
} catch (IOException e) {
SSE_CACHE.remove(entry.getKey());
}
}
}
}
public void sendMessageToClient(String clientId, String message) {
SseEmitter sseEmitter = SSE_CACHE.get(clientId);
if (sseEmitter != null) {
try {
sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(clientId).data(message));
} catch (IOException e) {
SSE_CACHE.remove(clientId);
}
}
}
public void sendMessageToAllClients(String message) {
for (Map.Entry<String, SseEmitter> entry : SSE_CACHE.entrySet()) {
try {
entry.getValue().send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(message));
} catch (IOException e) {
SSE_CACHE.remove(entry.getKey());
}
}
}
}
接口使用:SseController
:
@RestController
@CrossOrigin
public class SseController {
@Resource
private SseEmitterServer sseEmitterServer;
@GetMapping("/connect")
public SseEmitter connect() throws IOException {
return sseEmitterServer.createSession();
}
@GetMapping("/disconnect")
public String disconnect(@RequestParam String clientId) {
sseEmitterServer.closeSession(clientId);
return "Disconnected";
}
@PostMapping("/send")
public Result send(@RequestParam String msg) {
sseEmitterServer.sendMessageToAllClients(msg);
return Result.ok(null);
}
}
在前端使用也非常的方便,下面是 vue 的一个小示例:
// 创建一个 EventSource 对象实例
let eventSource = null;
onMounted(() => {
eventSource = new EventSource("http://localhost:9091/connect");
eventSource.onmessage = (event) => {
// 获取到后端返回的值
message.value += String.fromCharCode(event.data);
};
});
// 调用接口
await request.post(
"http://localhost:9091/send?msg=" + encodeURIComponent(text.value)
);
我们在后端提供两个接口,一个是前端进页面就进行 SSE 链接,另一个是调用接口,使用 SSE 返回给前端
初次进入页面时,进行连接
当发送消息调用接口,此处调用语言模型接口,以流式返回:
ElasticSearch
ElasticSearch 介绍
ElasticSearch 概述
Elasticsearch (简称 ES)是一个分布式、高扩展、高实时的、RESTful 风格的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用 Elasticsearch 的水平伸缩性,能使数据在生产环境变得更有价值。Elasticsearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到 Elasticsearch 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户。
Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档,用 JSON 作为文档序列化的格式,比如下面这条用户数据:
{
"name": "John",
"sex": "Male",
"age": 25,
"birthDate": "1990/05/01",
"about": "I love to go rock climbing",
"interests": ["sports", "music"]
}
用 Mysql 这样的数据库存储就会容易想到建立一张 User 表,有各个字段等,在 ElasticSearch 里这就是一个文档,当然这个文档会属于一个 User 的类型,各种各样的类型存在于一个索引当中。这里有一份简易的将 Elasticsearch 和关系型数据术语对照表:
索引(Index)
一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
能搜索的数据必须索引,这样的好处是可以提高查询速度,比如:新华字典前面的目录就是索引的意思,目录可以提高查询速度。
Elasticsearch 索引的精髓:一切设计都是为了提高搜索的性能。
类型(Type)
在一个索引中,你可以定义一种或多种类型。
一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。不同的版本,类型发生了不同的变化
版本 | Type |
---|---|
5.x | 支持多种 type |
6.x | 只能有一种 type |
7.x | 默认不再支持自定义索引类型(默认类型为:_doc) |
文档(Document)
一个文档是一个可被索引的基础信息单元,也就是一条数据
比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档以JSON(Javascript Object Notation)格式来表示,而 JSON 是一个到处存在的互联网数据交互格式。
在一个 index/type 里面,你可以存储任意多的文档。
字段(Field)
相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。
映射(Mapping)
mapping 是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理 ES 里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。
ElasticSearch 安装
下载软件
下载地址:ElasticSearch
推荐下载7.8版本
:Elasticsearch 7.8.0
windows 安装
解压文件,目录结构如下:
目录 | 说明 |
---|---|
bin | 可执行脚本目录 |
config | 配置目录 |
jdk | 内置 jdk 目录 |
lib | 类库 |
logs | 日志目录 |
modules | 模块目录 |
plugins | 插件目录 |
解压完成后进入 bin 目录,双击运行elasticsearch.bat
测试访问: http://localhost:9200/
注意事项一:
ElasticSearch 是使用 java 开发的,且本版本的ES需要 JDK 版本要是 1.8 以上,所以安装 ElasticSearch 之前保证 JDK1.8+安装完毕,并正确的配置好 JDK 环境变量,否则启动 ElasticSearch 失败。
注意事项二:
出现闪退,通过路径访问发现“空间不足”
修改 config/jvm.options 文件的 22 行 23 行,把 2 改成 1,让 Elasticsearch 启动的时候占用 1 个 G 的内存。
-Xmx512m:设置 JVM 最大可用内存为 512M。
-Xms512m:设置 JVM 初始内存 512m。此值可设置与-Xmx 相同,以避免每次垃圾回收完成后 JVM 重新分配内存。
ik 分词器安装
IK 分词器简介
IKAnalyzer 是一个开源的,基于 Java 语言开发的轻量级的中文分词工具包。从 2006 年 12 月推出 1.0 版开始,IKAnalyzer 已经推出 了 3 个大版本。最初,它是以开源项目 Lucene 为应用主体的,结合词典分词和文法分析算法的中文分词组件。新版本的 IKAnalyzer3.0 则发展为面向 Java 的公用分词组件,独立于 Lucene 项目,同时提供了对 Lucene 的默认优化实现。
IK 分词器 3.0 的特性如下:
1)采用了特有的“正向迭代最细粒度切分算法“,具有 60 万字/秒的高速处理能力。
2)采用了多子处理器分析模式,支持:英文字母(IP 地址、Email、URL)、数字(日期,常用中文数量词,罗马数字,科学计数法),中文词汇(姓名、地名处理)等分词处理。
3)对中英联合支持不是很好,在这方面的处理比较麻烦.需再做一次查询,同时是支持个人词条的优化的词典存储,更小的内存占用。
4)支持用户词典扩展定义。
5)针对 Lucene 全文检索优化的查询分析器 IKQueryParser;采用歧义分析算法优化查询关键字的搜索排列组合,能极大的提高 Lucene 检索的命中率。
IK 分词器的安装
下载:
GitHub 仓库地址:https://github.com/medcl/elasticsearch-analysis-ik
解压安装 IK 插件:
将ik压缩包
直接解压到 ElasticSearch 的 plugins\ik\目录下,注意目录结构,解压后的 zip 不要放在 plugins 目录下
kibana 安装-客户端
elasticsearch 服务是一个 restful 风格的 http 服务。我们可以采用 postman 作为客户端来进行操作,elastic stack 官方也给我们提供了 kibana 来进行客户端操作,这个相比 postman 要友好一点,因为里面有些自动补全的代码提示
下载地址: https://www.elastic.co/cn/downloads/past-releases/kibana-7-8-0
解压文件:进入到 config 目录,修改 kibana.yml 文件:
Kibana默认端口:5601
Kibana 连接 elasticsearch 服务器的地址:elasticsearch.hosts: [“http://localhost:9200"]
修改 kibana 配置支持中文:i18n.locale: “zh-CN”
运行访问
执行kibana-7.8.0\bin\kibana.bat
RabbitMQ
RabbitMQ 简介
以商品订单场景为例,
如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ 就是这样一款消息队列。RabbitMQ 是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
典型应用场景:
- 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃
安装 rabbitMQ
#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management
abbitMQ 服务后台
管理后台:http://IP:15672 ,用户名和密码默认:guest
在 SpringBoot 中引入,导入依赖:
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
配置application.yml
:
spring:
rabbitmq:
host: 192.168.10.100
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #发布确认模式,消息是否被成功发送到交换机
publisher-returns: true
listener:
simple:
prefetch: 1
concurrency: 3
acknowledge-mode: manual #消费端手动确认