ElasticSearch-Tips

这里记录一下es使用经验

1.ES资源评估公式

各规格下最大索引速度和查询速度预估
集群规模 3master+n data
连接方式 直连同步写及同步查询
数据量 查询基于320W数据,单条数据1K,索引1个副本、刷新时间1s,其他索引配置默认。
评估依据 基于data节点数的影响结果,预估不同data节点个数时的最大索引速度和查询速度,n为数据节点数。
虚拟机资源规格(CPU/JVM/主机) 索引速度(/s) 查询速度(/s)
0.5C/1G/2G 30_(1+n_0.2) 10_(1+n_0.2)
1C/2G/4G 100_(1+n_0.2) 55_(1+n_0.2)
2C/4G/8G 430_(1+n_0.2) 170_(1+n_0.2)
4C/8G/16G 1150_(1+n_0.2) 500_(1+n_0.2)
8C/16G/32G 2600_(1+n_0.2) 1800_(1+n_0.2)
容器资源规格(CPU/JVM/主机) 索引速度(/s) 查询速度(/s)
1C/2G/4G 110+n*20 50+n*10
2C/8G/16G 420+n*80 170+n*30
4C/16G/32G 1100+n*200 550+n*110
8C/32G/64G 2700+n*500 1900+n*300
12C/32G/64G 4600+n*900 4000+n*800
16C/32G/64G 6800+n*1400 7000+n*1400
24C/32G/64G 12300+n*2400 15300+n*3100
主分片数的评估方式
要求 1、同一个节点上分配的索引分片数不超过3个。
2、每个索引分片数大小不超过30G。
根据集群节点数计算承载最大的主分片数 节点数*3/2
根据数据大小计算最大的主分片数 XG/节点数/2

2.ES配置原则

服务内存在 1-64G之间的:

export ES_HEAP_SIZE = 一半服务器的内存把50%的内存给elasticsearch,剩下的50%也不会没有用处的,Lucene会很快吞噬剩下的这部分内存用于文件缓存。

服务器内存大于64G的

  • 方案一:更多的全文检索: export ES_HEAP_SIZE = 31G,剩下的给Lucene
  • 方案二:更多的排序和聚合:一台机器上创建两个或者更多ES节点,而不要部署一个使用32+GB内存的节点export ES_HEAP_SIZE = 31G剩下的给Lucene
  • 备注:cluster.routing.allocation.same_shard.host:true。这会防止同一个shard的主副本存在同一个物理机上(因为如果存在一个机器上,副本的高可用性就没有了)。

为什么32G是JVM堆内存设置的一个坎?

JVM 在内存小于 32 GB 的时候会采用一个内存对象指针压缩技术。在 Java 中,所有的对象都分配在堆上,并通过一个指针进行引用。 普通对象指针(OOP)指向这些对象,通常为 CPU 字长 的大小:32 位或 64 位,取决于你的处理器。指针引用的就是这个 OOP 值的字节位置。对于 32 位的系统,意味着堆内存大小最大为 4 GB。对于 64 位的系统, 可以使用更大的内存,但是 64 位的指针意味着更大的浪费,因为你的指针本身大了。更糟糕的是, 更大的指针在主内存和各级缓存(例如 LLC,L1 等)之间移动数据的时候,会占用更多的带宽。Java 使用一个叫作 内存指针压缩(compressed oops)的技术来解决这个问题。 它的指针不再表示对象在内存中的精确位置,而是表示 偏移量 。这意味着 32 位的指针可以引用 40 亿个 对象 , 而不是 40 亿个字节。最终, 也就是说堆内存增长到 32 GB 的物理内存,也可以用 32 位的指针表示。一旦你越过那个神奇的 ~32 GB 的边界,指针就会切回普通对象的指针。 每个对象的指针都变长了,就会使用更多的 CPU 内存带宽,也就是说你实际上失去了更多的内存。事实上,当内存到达 40–50 GB 的时候,有效内存才相当于使用内存对象指针压缩技术时候的 32 GB 内存。这段描述的意思就是说:即便你有足够的内存,也尽量不要 超过 32 GB。因为它浪费了内存,降低了 CPU 的性能,还要让 GC 应对大内存。

指针压缩

因为java需要记录对象在内存中的位置啊,这个记录在对象头中,32位机器对象头默认占了8个字节(内存指针+kclass),4字节=4*8=32位=4G,但是Java使用的是压缩指针不是12345这样记录的,是0,8,16这样记录的,所以8字节的对象头最大记录内存地址32G如果超过32G就需要更大的对象头记录位置和kclass,可以想象超过32G之后对象头占用内存那么多不值得,所以31G最优。

压缩指针是怎么实现的

不再保存所有引用,而是每隔8个字节保存一个引用。例如,原来保存每个引用0、1、2…,现在只保存0、8、16…。因此,指针压缩后,并不是所有引用都保存在堆中,而是以8个字节为间隔保存引用。在实现上,堆中的引用其实还是按照0x0、0x1、0x2…进行存储。只不过当引用被存入64位的寄存器时,JVM将其左移3位(相当于末尾添加3个0),例如0x0、0x1、0x2…分别被转换为0x0、0x8、0x10。而当从寄存器读出时,JVM又可以右移3位,丢弃末尾的0。(oop在堆中是32位,在寄存器中是35位,2的35次方=32G。也就是说,使用32位,来达到35位oop所能引用的堆内存空间)

3.ES部署原则

  • 【强制】开启内存锁,禁止swapping
  • 【强制】文件描述符数量调整至65536
  • 【强制】最大映射数调整至262144
  • 【强制】重要数据至少有一个副本。
  • 【强制】主分片和副本分片不能在同一个物理机上
  • 【强制】部署最后严格划分ES节点角色
  • 【强制】添加快照任务
  • 【建议】使用RestClient连接
  • 【强制】读连接client节点,写连接data节点。
  • 【强制】连接时,必须配置上所有的对应节点,避免单节点配置。

4.ES索引模板设计原则

  • 【建议】设置索引的refresh_interval,默认1s建议调至60s减少写入压力
  • 【强制】禁止不使用的字段存入Elasticsearch
  • 【强制】不作为查询字段的index值设置为false
  • 【强制】不允许_all字段(7.5.1版本不再支持)
  • 【强制】禁止es自动创建字段,防止索引结构混乱字段。dynamic属性设置为false遇到陌生字段,就忽略;strict:遇到陌生字段,就报错
  • 【强制】索引的分片数在一个节点上不超过3个,每个分片数量不超过30G
  • 【强制】对于大表必须设置分月或分天,一般为单个索引不超过100G
  • 【强制】数据必须有对应的生命周期,禁止永久保存,且周期性数据存储最长为6个月
  • 【强制】必须开启副本
  • 【强制】禁止不需要排序和聚合的字段doc_values属性设置为true
  • 【强制】表结构字段不超过50个
  • 【强制】不允许使用type,一个索引只存储一种结构(7.5.1版本不再支持type)
  • 【强制】index_patterns只允许包含一种索引模式
  • 【强制】禁止keyword类型字段超过256个字符,默认超过256个字符不会被索引

5.ES读写规范

  • 【建议】建议使用match_phrase替代match提高查询性能
  • 【建议】善用filter过滤器,过滤适合在大范围筛选数据,而查询则适合精确匹配数据。一般应用时,应先使用过滤操作过滤数据,然后使用查询匹配数据
  • 【建议】善用路由routing,极大提升查询效率
  • 【建议】大量数据提交时使用批量提交方法
  • 【建议】不得一次超大返回结果上限为100条,应需分页查询。具体实际限制根据集群规模
  • 【建议】单个查询条件字数长度不得超过30字
  • 【建议】尽量少使用delete_by_query删除文档,更好的方案是直接删除索引
  • 【建议】使用 datastrem 和 ILM 索引生命周期管理管理时序数据
  • 【建议】分片大小控制在 10GB-50GB
  • 【建议】控制在每 GB 堆内存 20 个分片以内,避免单个节点分片过多、负载过重
  • 【建议】增加字段限制,避免mapping爆炸
  • 【强制】不能使用index*的操作,同时跨多个es索引进行查询,会严重消耗es性能,建议单次查询不跨索引查询,跨索引查询不得超过3个
  • 【强制】设置查询所使用字段和返回结果列,查询结果返回不能超过1万条,数据量过大会导致es堆内存溢出
  • 【强制】查询条件不能超过10K,条件过大会导致查询缓慢和es栈内存溢出
  • 【强制】禁止使用前缀wildcards查询。
  • 【强制】查询条件禁止超过1024个。
  • 【强制】ES不支持事务性读写操作。
  • 【强制】避免内嵌查询。
  • 【强制,使用请进行评审】避免聚合查询。
  • 【强制】避免使用from和size深度分页,可以考虑使用searchAfter来实现。
  • 【强制】ES本身不支持事务!!!!

6.ES慢查询日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# index数据的慢日志   
PUT /you index name/_settings
{
"index.indexing.slowlog.threshold.index.warn": "10s",
"index.indexing.slowlog.threshold.index.info": "5s",
"index.indexing.slowlog.threshold.index.debug": "2s",
"index.indexing.slowlog.threshold.index.trace": "500ms",
"index.indexing.slowlog.level": "info",
"index.indexing.slowlog.source": "1000"
}
# search数据的慢日志 查询分为fetch和query,fetch先查出来id和score,再根据id查找全文,根据score排序。就像数据库的回表一样。索引fetch会较慢一些
PUT /you index name/_settings
{
"index.search.slowlog.threshold.query.warn": "10s",
"index.search.slowlog.threshold.query.info": "5s",
"index.search.slowlog.threshold.query.debug": "2s",
"index.search.slowlog.threshold.query.trace": "500ms",
"index.search.slowlog.threshold.fetch.warn": "1s",
"index.search.slowlog.threshold.fetch.info": "800ms",
"index.search.slowlog.threshold.fetch.debug": "500ms",
"index.search.slowlog.threshold.fetch.trace": "200ms",
"index.search.slowlog.level": "info"
}
# 查看慢日志所在位置
GET _nodes/settings?pretty=true

# 可以通过logstash将慢查询日志收集进ES中,然后进行分析
# 注:docker安装ES,会出现日志只输出控制台,不写入文件的情况,请自定义配置log4j2.properties文件

7.ES索引新增字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT user_order_2010(表名)/_mapping
{
"properties":{ //新增字段combineOrderNumber
"combineOrderNumber": {
"type": "keyword"
}
}
}
curl -X PUT http://127.0.0.1:9200/user_order_2010/_mapping -H 'Content-Type: application/json' -d '{
"properties":{ //新增字段combineOrderNumber
"combineOrderNumber": {
"type": "keyword"
}
}
}'

8.Docker安装ES及相关技术栈

注:如果是wsl中部署,存在ip地址重启之后变动的情况请使用网桥模式,将相关组件全使用一个网桥,通过docker 容器名称连接。

1
2
3
4
5
6
7
8
9
10
11
12
# 拉取镜像
docker pull elasticsearch:7.6.1
docker pull kibana:7.6.1
# 创建文件夹
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
echo "http.host: 0.0.0.0" >/mydata/elasticsearch/config/elasticsearch.yml
mkdir -p /mydata/elasticsearch/kibana/config/
vim /mydata/elasticsearch/kibana/config/kibana.yml
chmod -R 777 /mydata/elasticsearch/config
chmod -R 777 /mydata/elasticsearch/data
chmod -R 777 /mydata/elasticsearch/kibana/config/
1
2
3
4
5
6
7
8
9
#kibana.yaml
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.170.132:9200" ] # 根据自己情况选择是否使--network=es-network [[ "http://es容器名称:9200" ]]
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"
#密码
elasticsearch.username: "elastic"
elasticsearch.password: "passwd"
1
2
3
4
5
6
7
8
9
10
11
12
# elasticsearch.yml 高版本增加如下
http.host: 0.0.0.0
cluster.name: "docker-cluster"
network.hosts: 0.0.0.0
# 跨域
http.cors.allow-origin: "*"
http.cors.enabled: true
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
# 密码
xpack.security.enabled: true
xpack.license.self_generated.type: basic
xpack.security.transport.ssl.enabled: true
1
2
3
4
5
# 启动ES 根据自己情况选择是否使用网桥 --network=es-network 
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx512m" -v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /mydata/elasticsearch/data:/usr/share/elasticsearch/data -v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins -d elasticsearch:7.6.1

# 启动kibana 根据自己情况选择是否使用网桥 --network=es-network
docker run -d --name=kibana --restart=always -p 5601:5601 -v /mydata/elasticsearch/kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml kibana:7.6.1

docker 部署metricbea:

1
2
docker pull docker.elastic.co/beats/metricbeat:7.10.1
vim /mydata/elasticsearch/metricbeat/metricbeat.yml # 注意不能给777,给755
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
metricbeat.config:
modules:
path: ${path.config}/modules.d/*.yml
# Reload module configs as they change:
reload.enabled: false
metricbeat.autodiscover:
providers:
- type: docker
hints.enabled: true
metricbeat.modules:
- module: elasticsearch # 开启elasticsearch
xpack.enabled: true
period: 10s
hosts: ["http://elasticsearch:9200"] # 跨容器交互使用桥接
- module: docker
metricsets:
- "container"
- "cpu"
- "diskio"
- "healthcheck"
- "info"
#- "image"
- "memory"
- "network"
hosts: ["unix:///var/run/docker.sock"]
period: 10s
enabled: true
processors:
- add_cloud_metadata: ~
# 直接发送elasticsearch
output.elasticsearch:
hosts: ["elasticsearch:9200"] # 跨容器交互使用桥接

# 要加载仪表板,可以在metricbeat设置中启用仪表板加载。当仪表板加载被启用时,Metricbeat使用Kibana API来加载样本仪表板。只有当Metricbeat启动时,才会尝试仪表板加载。
# 设置kibana服务地址
setup.kibana.host: "kibana:5601" # 跨容器交互使用桥接
# 加载默认的仪表盘样式
setup.dashboards.enabled: true
# 设置如果存在模板,则不覆盖原有模板
setup.template.overwrite: false
1
2
3
4
5
6
7
# 根据自己情况选择是否使用网桥
docker run -d --network=es-network --name=metricbeat \
--user=root --volume="$(pwd)/metricbeat.yml:/usr/share/metricbeat/metricbeat.yml:ro" \
--volume="/var/run/docker.sock:/var/run/docker.sock:ro"\
--volume="/sys/fs/cgroup:/hostfs/sys/fs/cgroup:ro" \
--volume="/proc:/hostfs/proc:ro" --volume="/:/hostfs:ro" \
docker.elastic.co/beats/metricbeat:7.10.1 metricbeat -e

9.ES修复log4j2 bug

  1. 关闭ES集群
  2. 备份 elasticsearch 目录下lib下的 log4j-api-2.11.1.jar和log4j-core-2.11.1.jar
  3. cd /xxx/xxxx/elasticsearch/lib
  4. mkdir log_bak
  5. mv log4j* ./log_bak
  6. 上传log4j-core-2.17.0.jar 和 log4j-api-2.17.0.jar 到 lib目录下
  7. 启动ES集群

10.ES监控dashboard

1.python收集脚本如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python
import datetime
import time
import json
import urllib2
import os
import sys

# ElasticSearch Cluster to Monitor
#elasticServer = os.environ.get('ES_METRICS_CLUSTER_URL', 'http://192.168.2.13:9200')
#interval = int(os.environ.get('ES_METRICS_INTERVAL', '60'))

elasticServer = 'http://192.168.2.13:9200'
interval = 60
# ElasticSearch Cluster to Send Metrics
#elasticIndex = os.environ.get('ES_METRICS_INDEX_NAME', 'elasticsearch_metrics')
#elasticMonitoringCluster = os.environ.get('ES_METRICS_MONITORING_CLUSTER_URL', 'http://192.168.2.13:9200')

elasticIndex = 'elasticsearch_metrics'
elasticMonitoringCluster = 'http://192.168.2.13:9200'
# Enable Elasticsearch Security
# read_username and read_password for read ES cluster information
# write_username and write_passowrd for write monitor metric to ES.
read_es_security_enable = False
read_username = "read_username"
read_password = "read_password"

write_es_security_enable = False
write_username = "write_username"
write_password = "write_password"

def handle_urlopen(urlData, read_username, read_password):
if read_es_security_enable:
try:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, urlData, read_username, read_password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
response = urllib2.urlopen(urlData)
return response
except Exception as e:
print "Error: {0}".format(str(e))
else:
try:
response = urllib2.urlopen(urlData)
return response
except Exception as e:
print "Error: {0}".format(str(e))

def fetch_clusterhealth():
try:
utc_datetime = datetime.datetime.utcnow()
endpoint = "/_cluster/health"
urlData = elasticServer + endpoint
response = handle_urlopen(urlData,read_username,read_password)
jsonData = json.loads(response.read())
clusterName = jsonData['cluster_name']
jsonData['@timestamp'] = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
if jsonData['status'] == 'green':
jsonData['status_code'] = 0
elif jsonData['status'] == 'yellow':
jsonData['status_code'] = 1
elif jsonData['status'] == 'red':
jsonData['status_code'] = 2
post_data(jsonData)
return clusterName
except IOError as err:
print "IOError: Maybe can't connect to elasticsearch."
clusterName = "unknown"
return clusterName


def fetch_clusterstats():
utc_datetime = datetime.datetime.utcnow()
endpoint = "/_cluster/stats"
urlData = elasticServer + endpoint
response = handle_urlopen(urlData,read_username,read_password)
jsonData = json.loads(response.read())
jsonData['@timestamp'] = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
post_data(jsonData)


def fetch_nodestats(clusterName):
utc_datetime = datetime.datetime.utcnow()
endpoint = "/_cat/nodes?v&h=n"
urlData = elasticServer + endpoint
response = handle_urlopen(urlData,read_username,read_password)
nodes = response.read()[1:-1].strip().split('\n')
for node in nodes:
endpoint = "/_nodes/%s/stats" % node.rstrip()
urlData = elasticServer + endpoint
response = handle_urlopen(urlData,read_username,read_password)
jsonData = json.loads(response.read())
nodeID = jsonData['nodes'].keys()
try:
jsonData['nodes'][nodeID[0]]['@timestamp'] = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
jsonData['nodes'][nodeID[0]]['cluster_name'] = clusterName
newJsonData = jsonData['nodes'][nodeID[0]]
post_data(newJsonData)
except:
continue


def fetch_indexstats(clusterName):
utc_datetime = datetime.datetime.utcnow()
endpoint = "/_stats"
urlData = elasticServer + endpoint
response = handle_urlopen(urlData,read_username,read_password)
jsonData = json.loads(response.read())
jsonData['_all']['@timestamp'] = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
jsonData['_all']['cluster_name'] = clusterName
post_data(jsonData['_all'])


def post_data(data):
utc_datetime = datetime.datetime.utcnow()
url_parameters = {'cluster': elasticMonitoringCluster, 'index': elasticIndex,
'index_period': utc_datetime.strftime("%Y.%m.%d"), }
url = "%(cluster)s/%(index)s-%(index_period)s/message" % url_parameters
headers = {'content-type': 'application/json'}
try:
req = urllib2.Request(url, headers=headers, data=json.dumps(data))
if write_es_security_enable:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, write_username, write_password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
response = urllib2.urlopen(req)
else:
response = urllib2.urlopen(req)
except Exception as e:
print "Error: {0}".format(str(e))


def main():
clusterName = fetch_clusterhealth()
print(clusterName)
if clusterName != "unknown":
fetch_clusterstats()
fetch_nodestats(clusterName)
fetch_indexstats(clusterName)


if __name__ == '__main__':
try:
nextRun = 0
while True:
if time.time() >= nextRun:
nextRun = time.time() + interval
now = time.time()
main()
elapsed = time.time() - now
print "Total Elapsed Time: %s" % elapsed
timeDiff = nextRun - time.time()

# Check timediff , if timediff >=0 sleep, if < 0 send metrics to es
if timeDiff >= 0:
time.sleep(timeDiff)

except KeyboardInterrupt:
print 'Interrupted'
try:
sys.exit(0)
except SystemExit:
os._exit(0)

2.服务启动脚本

1
2
#!/bin/bash
nohup python2 ./elasticsearch2elastic.py > ./elastic.log 2>&1 &

3.服务停止脚本

1
2
3
4
5
6
7
8
9
10
11
12
#! /bin/shell
appName='elasticsearch2elastic.py'
pid=$(ps -ef | grep ${appName} | grep -v grep | awk '{print $2}')
echo -e $pid

kill -9 ${pid}
sleep 2
if [ $? -eq 0 ];then
echo "kill ${appName} success..."
else
echo "kill ${appName} fail"
fi

4.索引清理脚本-shell

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
ES_IP=10.0.0.73
ES_PORT=9200
INDEX_PREFIX='elasticsearch_metrics-'
INDEX_SUFF='*'
DATE_MONTH=`date -d "1 month ago" +%Y.%m`
INDEX_NAME=${INDEX_PREFIX}${DATE_MONTH}${INDEX_SUFF}
ES_URL=http://${ES_IP}:${ES_PORT}/${INDEX_NAME}
LOG_FILE=esindex_del-${DATE_MONTH}.out
curl -XDELETE ${ES_URL} >> ${LOG_FILE}

5.索引清理脚本-python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project :es-monitor
@File :index_clear.py
@IDE :PyCharm
@Author :Suaf
"""
import datetime
import os

ES_PORT = '9200'
ES_IP = '10.0.0.73'
INDEX_PREFIX = 'elasticsearch_metrics-'
INDEX_SUFF = '*'


def getLastMonth():
today = datetime.date.today()
first = today.replace(day=1)
last_month = first - datetime.timedelta(days=1)
month = last_month.strftime("%Y.%m")
index_name = INDEX_PREFIX + month + INDEX_SUFF
log_file = 'esindex_del-' + month + '.out'
command = ('curl -XDELETE http://%s:%s/%s > %s') % (ES_IP, ES_PORT, index_name, log_file)
print(command)
os.system(command)


if __name__ == '__main__':
getLastMonth()

11.IP转GeoIp

GeoIp processor 根据来自 Maxmind 数据库的数据添加有关IP地址地理位置的信息。默认情况下,GeoIp processor 将此信息添加到 geoip 字段下。GeoIp processor 可以解析 IPv4 和 IPv6 地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 创建一个预处理管道
PUT _ingest/pipeline/geoip_pipeline
{
"description" : "Add geoip info",
"processors" : [
{
"geoip" : {
"field" : "ip"
}
}
]
}

# 创建一个索引
# 考虑到后面要批量导入数千条+数据,我们采用了取巧的方式。使用了在创建索引的时候指定缺省管道(index.default_pipeline)的方式。
# 这样的好处是:
# 灵活:用户只关心 bulk 批量写入数据。
# 零写入代码修改:甚至写入数据的代码一行都不需要改就可以。
PUT ip_index
{
"settings": {
"index.default_pipeline": "geoip_pipeline"
},
"mappings": {
"properties": {
"geoip": {
"properties": {
"location": {
"type": "geo_point"
}
}
},
"ip":{
"type":"keyword"
}
}
}
}
# 写入一条数据
PUT ip_index/_doc/1
{
"ip":"8.8.8.8"
}

12.ES存储深入了解

ES配置的目录:

  • path.home:运行Elasticsearch进程的用户的主目录。默认为Java系统属性user.dir,它是进程所有者的默认主目录
  • path.conf:包含配置文件的目录。这通常通过设置Java系统属性es.config来设置,因为在找到配置文件之前它必然会被解析
  • path.plugins:子文件夹为Elasticsearch插件的目录
  • path.logs:存储生成的日志的位置。如果其中一个卷的磁盘空间不足,则将它放在与数据目录不同的卷上可能是有意义的
  • path.data:包含Elasticsearch存储的数据的文件夹的路径(只看他)

文件从哪里来:

  • Lucene负责写和维护Lucene索引文件
  • 而Elasticsearch在Lucene之上写与功能相关的元数据,例如字段映射,索引设置和其他集群元数据。 最终用户和支持功能
  • 在低级Lucene中不存在,由Elasticsearch提供

文件结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
data
|__ elasticsearch
|__ nodes
|__ 0
|__ _state
| |__ global-0.st # 集群的全局元数据
|__ node.lock # 文件用于确保一次只能从一个数据目录读取/写入一个Elasticsearch相关安装信息
|__ indices
|__ 45aonVA8RPSlHSKGnK_yaA
|__ 0 # 0包含与索引的第一个(也是唯一的)分片相关的数据(分片0
| |__ _state
| |__ index #
| |__ translog # 每个分片的 事务日志,保可以安全地将数据索引到Elasticsearch
|__ _state # 前者_state包含所谓的索引状态文件,存储索引的元数据

data/
└── nodes
└── 0
├── _state # 集群的全局元数据
├── indices # 存储索引的目录
│   ├── 45aonVA8RPSlHSKGnK_yaA # 索引唯一标识
│   │   ├── 0 # 0包含与索引的第一个(也是唯一的)分片相关的数据(分片0
│   │   │   ├── _state # _state包含所谓的索引状态文件,存储索引的元数据
│   │   │   │   ├── retention-leases-41.st
│   │   │   │   └── state-1.st
│   │   │   ├── index # 索引数据
│   │   │   │   ├── _0.cfe
│   │   │   │   ├── _0.cfs
│   │   │   │   ├── _0.si
│   │   │   │   ├── segments_3
│   │   │   │   └── write.lock
│   │   │   └── translog # 每个分片的 事务日志,保可以安全地将数据索引到Elasticsearch
│   │   │   ├── translog-4.tlog
│   │   │   └── translog.ckp
│   │   └── _state # 索引的元数据,它的创建时间戳,它还包含唯一标识符以及索引的设置和映射
│   │   └── state-5.s
└── node.lock # 文件用于确保一次只能从一个数据目录读取/写入一个Elasticsearch相关安装信息

13.ES查询出现数据量倒退问题

两次查询数据量不一致,因为ES查询为了降低查询压力默认会轮询查询主分片与副本分片,但是主副本之间同步会存在同步时差,所以限制只查询主分片就会避免该问题出现,具体解决方式如下:

1
2
3
4
5
6
7
8
9
GET /_search?preference=_primary    # preference查询偏好设置
{
"query": {
"match": {
"title": "elasticsearch"
}
}
}
# searchRequest.preference("_primary");
  • _primary: 指查询只在主分片中查询
  • _primary_first: 指查询会先在主分片中查询,如果主分片找不到(挂了),就会在副本中查询
  • _local::指查询操作会优先在本地节点有的分片中查询,没有的话再在其它节点查询。
  • __only_node:指在指定id的节点里面进行查询,如果该节点只有要查询索引的部分分片,就只在这部分分片中查找,所以查询结果可能不完整。如_only_node:123在节点id为123的节点中查询。
  • _prefer_node:nodeid 优先在指定的节点上执行查询
  • _shards:0,1,2,3,4:查询指定分片的数据
  • Custom (string) value:用户自定义值,指在参数cluster.routing.allocation.awareness.attributes指定的值,如这个值设置为了zone,那么preference=zone的话就在awareness.attributes=zone*这样的节点搜索,如zone1、zone2

14.基于拼音和中文进行搜索

一般情况下,有些搜索需求是需要根据拼音和中文来搜索的,那么在elasticsearch中是如何来实现基于拼音来搜索的,可以通过elasticsearch-analysis-pinyin分析器来实现,那我们的需求:自定义一个分词器,即可以实现拼音搜索,也可以实现中文搜索。参考这里

14.1.安装拼音分词器Pinyin Analysis for Elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 进入 es 的插件目录
cd /Users/suaofeng/Downloads/docker/elasticsearch/plugins
# 下载
wget https://github.com/medcl/elasticsearch-analysis-pinyin/releases/tag/v7.10.1# 新建目录
mkdir analysis-pinyin
# 解压
mv elasticsearch-analysis-pinyin-7.10.1.zip analysis-pinyin && cd analysis-pinyin && unzip elasticsearch-analysis-pinyin-7.10.1.zip && rm -rvf elasticsearch-analysis-pinyin-7.10.1.zip
cd ../ && chown -R es:es analysis-pinyin

# 启动es并测试,结果发现实现了拼音分词,但是这个不一定满足我们的需求,
# 比如没有中文了,单个的拼音(比如:wo)是没有什么用的,需要对拼音分词器进行定制化。
GET _analyze
{
"text": ["我是程序员"]
, "analyzer": "pinyin"
}

{
"tokens" : [
{
"token" : "wo",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 0
},
{
"token" : "wscxy",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 0
},
{
"token" : "shi",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 1
},
{
"token" : "cheng",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 2
},
{
"token" : "xu",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 3
},
{
"token" : "yuan",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 4
}
]
}

14.2.自定义分词器

再次分析es分词的三个过程

  • character filters: 用于在tokenizer之前对文本进行处理。比如:删除字符,替换字符等。
  • tokenizer: 将文本按照一定的规则分成独立的token。即实现分词功能。
  • tokenizer filter: 将tokenizer输出的词条做进一步的处理。比如: 同义词处理,大小写转换、移除停用词,拼音处理等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
PUT /test_pinyin
{
"settings": {
// 分析阶段的设置
"analysis": {
// 分析器设置
"analyzer": {
// 自定义分析器,在tokenizer阶段使用ik_max_word,在filter上使用py
"custom_analyzer": {
"tokenizer": "ik_max_word",
"filter": "custom_pinyin"
}
},
// 由于不满足pinyin分词器的默认设置,所以我们基于pinyin
// 自定义了一个filter,叫custom_pinyin,其中修改了一些设置
// 这些设置可以在pinyin分词器官网找到
"filter": {
"custom_pinyin": {
"type": "pinyin",
// 不会这样分:刘德华 > [liu, de, hua]
"keep_full_pinyin": false,
// 这样分:刘德华 > [liudehua]
"keep_joined_full_pinyin": true,
// 保留原始token(即中文)
"keep_original": true,
// 设置first_letter结果的最大长度,默认值:16
"limit_first_letter_length": 16,
// 当启用此选项时,将删除重复项以保存索引,例如:de的> de,默认值:false,注意:位置相关查询可能受影响
"remove_duplicated_term": true,
// 如果非汉语字母是拼音,则将其拆分为单独的拼音术语,默认值:true,如:liudehuaalibaba13zhuanghan- > liu,de,hua,a,li,ba,ba,13,zhuang,han,注意:keep_none_chinese和keep_none_chinese_together应首先启用
"none_chinese_pinyin_tokenize": false
}
}
}
},
// 定义mapping
"mappings": {
"properties": {
"name": {
"type": "text",
// 创建倒排索引时使用的分词器
"analyzer": "custom_analyzer",
// 搜索时使用的分词器,搜索时不使用custom_analyzer是为了防止 词语的拼音一样,但是中文含义不一样,导致搜索错误。 比如: 科技 和 客机,拼音一样,但是含义不一样
"search_analyzer": "ik_smart"
}
}
}
}

注意:
可以看到 我们的 name字段 使用的分词器是 custom_analyzer,这个是我们在上一步定义的。但是搜索的时候使用的是 ik_smart,这个为甚么会这样呢?
假设我们存在如下2个文本 科技强国和 这是一架客机, 那么科技和客机的拼音是不是就是一样的。 这个时候如果搜索时使用的分词器也是custom_analyzer那么,搜索科技的时候客机也会搜索出来,这样是不对的。因此在搜索的时候中文就以中文搜,拼音就以拼音搜。当 analyzer和search_analyzer的值都是custom_analyzer,搜索时也会通过拼音搜索,这样的结果可能就不是我们想要的。

关于拼音分词器的一些可选配置:
The plugin includes analyzer: pinyin , tokenizer: pinyin and token-filter: pinyin.

  • keep_first_letter when this option enabled, eg: 刘德华>ldh, default: true
  • keep_separate_first_letter when this option enabled, will keep first letters separately, eg: 刘德华>l,d,h, default: false, NOTE: query result maybe too fuzziness due to term too frequency
  • limit_first_letter_length set max length of the first_letter result, default: 16
  • keep_full_pinyin when this option enabled, eg: 刘德华> [liu,de,hua], default: true
  • keep_joined_full_pinyin when this option enabled, eg: 刘德华> [liudehua], default: false
  • keep_none_chinese keep non chinese letter or number in result, default: true
  • keep_none_chinese_together keep non chinese letter together, default: true, eg: DJ音乐家 -> DJ,yin,yue,jia, when set to false, eg: DJ音乐家 -> D,J,yin,yue,jia, NOTE: keep_none_chinese should be enabled first
  • keep_none_chinese_in_first_letter keep non Chinese letters in first letter, eg: 刘德华AT2016->ldhat2016, default: true
  • keep_none_chinese_in_joined_full_pinyin keep non Chinese letters in joined full pinyin, eg: 刘德华2016->liudehua2016, default: false
  • none_chinese_pinyin_tokenize break non chinese letters into separate pinyin term if they are pinyin, default: true, eg: liudehuaalibaba13zhuanghan -> liu,de,hua,a,li,ba,ba,13,zhuang,han, NOTE: keep_none_chinese and keep_none_chinese_together should be enabled first
  • keep_original when this option enabled, will keep original input as well, default: false
  • lowercase lowercase non Chinese letters, default: true
  • trim_whitespace default: true
  • remove_duplicated_term when this option enabled, duplicated term will be removed to save index, eg: de的>de, default: false, NOTE: position related query maybe influenced
  • ignore_pinyin_offset after 6.0, offset is strictly constrained, overlapped tokens are not allowed, with this parameter, overlapped token will allowed by ignore offset, please note, all position related query or highlight will become incorrect, you should use multi fields and specify different settings for different query purpose. if you need offset, please set it to false. default: true.

14.3.插入数据并测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#插入数据
PUT /test_pinyin/_bulk
{"index":{"_id":1}}
{"name": "科技强国"}
{"index":{"_id":2}}
{"name": "这是一架客机"}
{"index":{"_id":3}}

# 中文query
GET test_pinyin/_search
{
"query": {
"match": {
"name": "科技"
}
}
}

# 拼音query
GET test_pinyin/_search
{
"query": {
"match": {
"name": "keji"
}
}
}

15.数据迁移

elasticsearch-dump备份文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 不一定要a集群到b集群,到文件到s3都行,具体看文档说明
docker run --network host --rm -ti -v /data:/tmp elasticdump/elasticsearch-dump \
--input=http://elastic:law2022@192.168.0.22:9200/law_event_info@1665926463 \
--output=https://elastic:law2022@lvfa-v.com:8085/law_event_info@1665926463 \
--type=analyzer

# 开启index
POST /law_event_info@1665926463/_open

docker run --network host --rm -ti -v /data:/tmp elasticdump/elasticsearch-dump \
--input=http://elastic:law2022@192.168.0.22:9200/law_event_info@1665926463 \
--output=https://elastic:law2022@lvfa-v.com:8085/law_event_info@1665926463 \
--type=mapping

docker run --network host --rm -ti -v /data:/tmp elasticdump/elasticsearch-dump \
--input=http://elastic:law2022@192.168.0.22:9200/law_event_info@1665926463 \
--output=https://elastic:law2022@lvfa-v.com:8085/law_event_info@1665926463 \
--type=data

spring boot data elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* <br>
* 主题信息 对映主题表
*
* @author: su af
* @version: v1.0
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@Document(indexName = "law_forum_theme")
@Setting(shards = 2, refreshInterval = "10s")
//@Setting(settingPath = "/elasticsearch_config.json") 自定义分词器与拼音过滤器,实现拼音补全
public class ThemeEntity implements Serializable {

/**
* id 关联数据库id
*/
@Id
private Long id;

/**
* tag #话题#
* 想做词云?field data需要设置
* mainField = @Field(type = FieldType.Text, fielddata = true)
*/
@MultiField(
mainField = @Field(type = FieldType.Text, fielddata = true),
otherFields = {
@InnerField(suffix = "keyword",type = FieldType.Keyword)
}
)
private String title;


@CompletionField(analyzer="ik_max_word",searchAnalyzer="ik_smart", maxInputLength = 20)
private Completion titleSuggest;

/**
* 描述
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String description;

/**
* 审核状态
*/
@Field(type = FieldType.Integer)
private Integer status;

/**
* 删除状态
* 默认值: 0
*/
@Field(type = FieldType.Integer)
private Integer isDelete;

/**
* 坐标 点
*/
@GeoPointField
private GeoPoint location;

/**
* 经度
*/
@Field(type = FieldType.Double)
private double longitude;

/**
* 纬度
*/
@Field(type = FieldType.Double)
private double latitude;

/**
* 坐标 区域
* "type" : "polygon",
* "coordinates" : [
* [ [10.0, 0.0], [11.0, 0.0], [11.0, 1.0], [10.0, 1.0], [10.0, 0.0] ]
* ]
*/
@GeoShapeField
private GeoJson landmark;

/**
* //@LastModifiedDate
* //@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second,pattern = "yyyy-MM-dd HH:mm:ss:SSS")
* 创建时间
*/
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime createTime;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Repository
public interface ThemeEntityRepository extends ElasticsearchRepository<ThemeEntity, Long> {

/**
* 根据描述或者title查询贴吧
*
* @param describe 描述
* @param title title
* @return list
*/
@Highlight(fields = {
@HighlightField(name = "content"),
@HighlightField(name = "description"),
@HighlightField(name = "title"),
}, parameters = @HighlightParameters(
preTags = "<span style='color:red'>",
postTags = "</strong>"
))
List<SearchHit<ThemeEntity>> findThemeEntityByDescriptionOrTitle(String describe, String title);


/**
* 令人费解,如果获取page 则不能获取高亮数据
* @param describe
* @param title
* @param pageable
* @return
*/
@Highlight(fields = {
@HighlightField(name = "content"),
@HighlightField(name = "description"),
@HighlightField(name = "title"),
}, parameters = @HighlightParameters(
preTags = "<span style='color:red'>",
postTags = "</strong>"
))
Page<ThemeEntity> findThemeEntityByDescriptionOrTitle(String describe, String title, Pageable pageable);

/**
* sort query
* @param describe desc
* @param title title
* @param sort sort
* @return list
*/
@Highlight(fields = {
@HighlightField(name = "content"),
@HighlightField(name = "description"),
@HighlightField(name = "title"),
}, parameters = @HighlightParameters(
preTags = "<span style='color:red'>",
postTags = "</strong>"
))
List<ThemeEntity> findThemeEntityByDescriptionOrTitle(String describe, String title, Sort sort);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Test
@SneakyThrows
public void testCompletion() {
ThemeInfoEntity themeInfoEntity = new ThemeInfoEntity();
themeInfoEntity.setId(1L);
themeInfoEntity.setTitle("这是一个有意思的标题");
Completion completion = new Completion(Lists.newArrayList("这是一个有意思的标题"));
themeInfoEntity.setTitleSuggest(completion);
themeInfoEntityRepository.save(themeInfoEntity);

SuggestBuilder suggestBuilder = new SuggestBuilder().addSuggestion("simple_suggest",
SuggestBuilders.completionSuggestion("titleSuggest")
.prefix("这是")
.analyzer("ik_max_word")
.size(3)
);
SearchResponse suggestResp = elasticsearchOperations.suggest(suggestBuilder, ThemeInfoEntity.class);
Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> completionSuggest = suggestResp.getSuggest().getSuggestion("simple_suggest");
List<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> entries = completionSuggest.getEntries();
List<Object> collect = entries.stream().map(entry -> {
if (!entry.getOptions().isEmpty()) {
Text text = entry.getOptions().get(0).getText();
return text.string();
}
return "";
}).collect(Collectors.toList());
collect.forEach(System.out::println);
}

@Test
@SneakyThrows
public void testSuggestTerm() {
/**
* {
* "errorCorrection" : {
* "text" : "The suggest feature suggestts",
* "term" : {
* "analyzer" : "ik_max_word",
* "field" : "description",
* "size" : 3,
* "suggest_mode" : "MISSING",
* "accuracy" : 0.5,
* "sort" : "SCORE",
* "string_distance" : "INTERNAL",
* "max_edits" : 2,
* "max_inspections" : 5,
* "max_term_freq" : 0.01,
* "prefix_length" : 1,
* "min_word_length" : 4,
* "min_doc_freq" : 0.0
* }
* }
* }
*/
SuggestBuilder suggestBuilder = new SuggestBuilder().addSuggestion("errorCorrection",
SuggestBuilders.termSuggestion("description")
.text("The suggest feature suggestts")
.suggestMode(TermSuggestionBuilder.SuggestMode.MISSING)
.analyzer("ik_max_word")
.size(3).sort(SortBy.SCORE)
);
SearchResponse suggestResp = elasticsearchOperations.suggest(suggestBuilder, ThemeEntity.class);
Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> errorCorrection = suggestResp.getSuggest().getSuggestion("errorCorrection");
List<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> entries = errorCorrection.getEntries();
List<Object> collect = entries.stream().map(entry -> {
if (!entry.getOptions().isEmpty()) {
Text text = entry.getOptions().get(0).getText();
return text.string();
}
return "";
}).collect(Collectors.toList());
collect.forEach(System.out::println);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@Test
public void query() {
//分页
Pageable pageInfo = PageRequest.of(1 - 1, 10);
//查询条件构建
GeoPoint geoPoint = new GeoPoint(39.927, 116.322);
NativeSearchQuery searchQuery = getSearchQuery(geoPoint, pageInfo);
//查询并返回结果
SearchHits<ThemeEntity> themeEntitySearchHits = elasticsearchOperations.search(searchQuery, ThemeEntity.class);

List<Integer> nearbyStoreList = new ArrayList<>();
themeEntitySearchHits.forEach(hitTheme -> {
ThemeEntity theme = hitTheme.getContent();
int distance = 0;
//坐标不为空时计算出距离(单位:km)
if (!StringUtils.isEmpty(geoPoint.getLat() + "") && !StringUtils.isEmpty(geoPoint.getLon() + "")) {
//LocationUtils.getDistance,通过经纬度获得实际距离的方法,可百度
distance = (int) LocationUtils.getDistance(geoPoint, theme.getLocation());
}
nearbyStoreList.add(distance);
}
);
nearbyStoreList.forEach(System.out::println);
}

@Test
public void queryShape() {
//分页
Pageable pageInfo = PageRequest.of(1 - 1, 10);
//查询条件构建
GeoPoint geoPoint1 = new GeoPoint(39.927, 116.322);
GeoPoint geoPoint2 = new GeoPoint(39.227, 116.722);
NativeSearchQuery searchQuery = getSearchQuery(geoPoint1, geoPoint2, pageInfo);
//查询并返回结果
SearchHits<ThemeEntity> themeEntitySearchHits = elasticsearchOperations.search(searchQuery, ThemeEntity.class);

themeEntitySearchHits.forEach(hitTheme -> {
ThemeEntity theme = hitTheme.getContent();
System.out.println(theme);
}
);
}

/**
* @description: 入参条件查询构建
**/
private NativeSearchQuery getSearchQuery(GeoPoint locationModel, Pageable pageInfo) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//boolQuery.must(boolQuery.must(QueryBuilders.termQuery("is_delete", false)));
//不为空查询经度与纬度
boolQuery.must(QueryBuilders.existsQuery("latitude"));
boolQuery.must(QueryBuilders.existsQuery("longitude"));
//构建查询
NativeSearchQueryBuilder nativeBuilder = new NativeSearchQueryBuilder();
//调用地理位置查询构建方法
if (!StringUtils.isEmpty(locationModel.getLat() + "") && !StringUtils.isEmpty(locationModel.getLon() + "")) {
getLocation(locationModel, boolQuery, nativeBuilder);
}
if (StringUtils.isEmpty(locationModel.getLat() + "") || StringUtils.isEmpty(locationModel.getLon() + "")) {
nativeBuilder.withSort(SortBuilders.fieldSort("latitude").order(SortOrder.DESC));
}
//构建完成
return nativeBuilder
.withQuery(boolQuery)
.withPageable(pageInfo)
.build();
}

private NativeSearchQuery getSearchQuery(GeoPoint locationModel1, GeoPoint locationModel2, Pageable pageInfo) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//storeBool.must(storeBool.must(QueryBuilders.termQuery("is_delete", false)));
//不为空查询经度与纬度
boolQuery.must(QueryBuilders.existsQuery("latitude"));
boolQuery.must(QueryBuilders.existsQuery("longitude"));
//构建查询
NativeSearchQueryBuilder nativeBuilder = new NativeSearchQueryBuilder();
//调用地理位置查询构建方法
if (!StringUtils.isEmpty(locationModel1.getLat() + "") && !StringUtils.isEmpty(locationModel2.getLon() + "")) {
getShape(locationModel1, locationModel2, boolQuery, nativeBuilder);
}
if (StringUtils.isEmpty(locationModel1.getLat() + "") || StringUtils.isEmpty(locationModel1.getLon() + "")) {
nativeBuilder.withSort(SortBuilders.fieldSort("latitude").order(SortOrder.DESC));
}
//构建完成
return nativeBuilder
.withQuery(boolQuery)
.withPageable(pageInfo)
.build();
}

/**
* @description: 地理位置查询条件构建
**/
private void getLocation(GeoPoint geoPoint, BoolQueryBuilder queryBool, NativeSearchQueryBuilder searchQueryBuilder) {
// 以某点为中心,搜索指定范围
GeoDistanceQueryBuilder distanceQueryBuilder = new GeoDistanceQueryBuilder("location");
//指定从哪个位置搜索
distanceQueryBuilder.point(geoPoint.getLat(), geoPoint.getLon());
//指定搜索多少km,distance可为自定义数值
distanceQueryBuilder.distance(10, DistanceUnit.KILOMETERS);
queryBool.filter(distanceQueryBuilder);
// 按距离升序排列
GeoDistanceSortBuilder distanceSortBuilder = new GeoDistanceSortBuilder("location", geoPoint.getLat(), geoPoint.getLon());
distanceSortBuilder.unit(DistanceUnit.KILOMETERS);
distanceSortBuilder.order(SortOrder.ASC);
searchQueryBuilder.withSort(distanceSortBuilder);
}


ElasticSearch-Tips
https://vegetablest.github.io/2021/04/24/ElasticSearch-Tips/
作者
af su
发布于
2021年4月24日
许可协议