这里记录一下es使用经验 1.ES资源评估公式
各规格下最大索引速度和查询速度预估
集群规模
3master+n data
连接方式
直连同步写及同步查询
数据量
查询基于320W数据,单条数据1K,索引1个副本、刷新时间1s,其他索引配置默认。
评估依据
基于data节点数的影响结果,预估不同data节点个数时的最大索引速度和查询速度,n为数据节点数。
虚拟机资源规格(CPU/JVM/主机)
索引速度(/s)
查询速度(/s)
0.5C/1G/2G
30_(1+n_0.2)
10_(1+n_0.2)
1C/2G/4G
100_(1+n_0.2)
55_(1+n_0.2)
2C/4G/8G
430_(1+n_0.2)
170_(1+n_0.2)
4C/8G/16G
1150_(1+n_0.2)
500_(1+n_0.2)
8C/16G/32G
2600_(1+n_0.2)
1800_(1+n_0.2)
容器资源规格(CPU/JVM/主机)
索引速度(/s)
查询速度(/s)
1C/2G/4G
110+n*20
50+n*10
2C/8G/16G
420+n*80
170+n*30
4C/16G/32G
1100+n*200
550+n*110
8C/32G/64G
2700+n*500
1900+n*300
12C/32G/64G
4600+n*900
4000+n*800
16C/32G/64G
6800+n*1400
7000+n*1400
24C/32G/64G
12300+n*2400
15300+n*3100
主分片数的评估方式
要求
1、同一个节点上分配的索引分片数不超过3个。
2、每个索引分片数大小不超过30G。
根据集群节点数计算承载最大的主分片数
节点数*3/2
根据数据大小计算最大的主分片数
XG/节点数/2
2.ES配置原则 服务内存在 1-64G之间的 :
export ES_HEAP_SIZE = 一半服务器的内存把50%的内存给elasticsearch,剩下的50%也不会没有用处的,Lucene会很快吞噬剩下的这部分内存用于文件缓存。
服务器内存大于64G的 :
方案一:更多的全文检索: export ES_HEAP_SIZE = 31G,剩下的给Lucene
方案二:更多的排序和聚合:一台机器上创建两个或者更多ES节点,而不要部署一个使用32+GB内存的节点export ES_HEAP_SIZE = 31G剩下的给Lucene
备注:cluster.routing.allocation.same_shard.host:true。这会防止同一个shard的主副本存在同一个物理机上(因为如果存在一个机器上,副本的高可用性就没有了)。
为什么32G是JVM堆内存设置的一个坎?
JVM 在内存小于 32 GB 的时候会采用一个内存对象指针压缩技术。在 Java 中,所有的对象都分配在堆上,并通过一个指针进行引用。 普通对象指针(OOP)指向这些对象,通常为 CPU 字长 的大小:32 位或 64 位,取决于你的处理器。指针引用的就是这个 OOP 值的字节位置。对于 32 位的系统,意味着堆内存大小最大为 4 GB。对于 64 位的系统, 可以使用更大的内存,但是 64 位的指针意味着更大的浪费,因为你的指针本身大了。更糟糕的是, 更大的指针在主内存和各级缓存(例如 LLC,L1 等)之间移动数据的时候,会占用更多的带宽。Java 使用一个叫作 内存指针压缩(compressed oops)的技术来解决这个问题。 它的指针不再表示对象在内存中的精确位置,而是表示 偏移量 。这意味着 32 位的指针可以引用 40 亿个 对象 , 而不是 40 亿个字节。最终, 也就是说堆内存增长到 32 GB 的物理内存,也可以用 32 位的指针表示。一旦你越过那个神奇的 ~32 GB 的边界,指针就会切回普通对象的指针。 每个对象的指针都变长了,就会使用更多的 CPU 内存带宽,也就是说你实际上失去了更多的内存。事实上,当内存到达 40–50 GB 的时候,有效内存才相当于使用内存对象指针压缩技术时候的 32 GB 内存。这段描述的意思就是说:即便你有足够的内存,也尽量不要 超过 32 GB。因为它浪费了内存,降低了 CPU 的性能,还要让 GC 应对大内存。
指针压缩
因为java需要记录对象在内存中的位置啊,这个记录在对象头中,32位机器对象头默认占了8个字节(内存指针+kclass),4字节=4*8=32位=4G,但是Java使用的是压缩指针不是12345这样记录的,是0,8,16这样记录的,所以8字节的对象头最大记录内存地址32G如果超过32G就需要更大的对象头记录位置和kclass,可以想象超过32G之后对象头占用内存那么多不值得,所以31G最优。
压缩指针是怎么实现的
不再保存所有引用,而是每隔8个字节保存一个引用。例如,原来保存每个引用0、1、2…,现在只保存0、8、16…。因此,指针压缩后,并不是所有引用都保存在堆中,而是以8个字节为间隔保存引用。在实现上,堆中的引用其实还是按照0x0、0x1、0x2…进行存储。只不过当引用被存入64位的寄存器时,JVM将其左移3位(相当于末尾添加3个0),例如0x0、0x1、0x2…分别被转换为0x0、0x8、0x10。而当从寄存器读出时,JVM又可以右移3位,丢弃末尾的0。(oop在堆中是32位,在寄存器中是35位,2的35次方=32G。也就是说,使用32位,来达到35位oop所能引用的堆内存空间)
3.ES部署原则
【强制】开启内存锁,禁止swapping
【强制】文件描述符数量调整至65536
【强制】最大映射数调整至262144
【强制】重要数据至少有一个副本。
【强制】主分片和副本分片不能在同一个物理机上
【强制】部署最后严格划分ES节点角色
【强制】添加快照任务
【建议】使用RestClient连接
【强制】读连接client节点,写连接data节点。
【强制】连接时,必须配置上所有的对应节点,避免单节点配置。
4.ES索引模板设计原则
【建议】设置索引的refresh_interval,默认1s建议调至60s减少写入压力
【强制】禁止不使用的字段存入Elasticsearch
【强制】不作为查询字段的index值设置为false
【强制】不允许_all字段(7.5.1版本不再支持)
【强制】禁止es自动创建字段,防止索引结构混乱字段。dynamic属性设置为false遇到陌生字段,就忽略;strict:遇到陌生字段,就报错
【强制】索引的分片数在一个节点上不超过3个,每个分片数量不超过30G
【强制】对于大表必须设置分月或分天,一般为单个索引不超过100G
【强制】数据必须有对应的生命周期,禁止永久保存,且周期性数据存储最长为6个月
【强制】必须开启副本
【强制】禁止不需要排序和聚合的字段doc_values属性设置为true
【强制】表结构字段不超过50个
【强制】不允许使用type,一个索引只存储一种结构(7.5.1版本不再支持type)
【强制】index_patterns只允许包含一种索引模式
【强制】禁止keyword类型字段超过256个字符,默认超过256个字符不会被索引
5.ES读写规范
【建议】建议使用match_phrase替代match提高查询性能
【建议】善用filter过滤器,过滤适合在大范围筛选数据,而查询则适合精确匹配数据。一般应用时,应先使用过滤操作过滤数据,然后使用查询匹配数据
【建议】善用路由routing,极大提升查询效率
【建议】大量数据提交时使用批量提交方法
【建议】不得一次超大返回结果上限为100条,应需分页查询。具体实际限制根据集群规模
【建议】单个查询条件字数长度不得超过30字
【建议】尽量少使用delete_by_query删除文档,更好的方案是直接删除索引
【建议】使用 datastrem 和 ILM 索引生命周期管理管理时序数据
【建议】分片大小控制在 10GB-50GB
【建议】控制在每 GB 堆内存 20 个分片以内,避免单个节点分片过多、负载过重
【建议】增加字段限制,避免mapping爆炸
【强制】不能使用index*的操作,同时跨多个es索引进行查询,会严重消耗es性能,建议单次查询不跨索引查询,跨索引查询不得超过3个
【强制】设置查询所使用字段和返回结果列,查询结果返回不能超过1万条,数据量过大会导致es堆内存溢出
【强制】查询条件不能超过10K,条件过大会导致查询缓慢和es栈内存溢出
【强制】禁止使用前缀wildcards查询。
【强制】查询条件禁止超过1024个。
【强制】ES不支持事务性读写操作。
【强制】避免内嵌查询。
【强制,使用请进行评审】避免聚合查询。
【强制】避免使用from和size深度分页,可以考虑使用searchAfter来实现。
【强制】ES本身不支持事务!!!!
6.ES慢查询日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # index数据的慢日志 PUT /you index name/_settings{ "index.indexing.slowlog.threshold.index.warn" : "10s" , "index.indexing.slowlog.threshold.index.info" : "5s" , "index.indexing.slowlog.threshold.index.debug" : "2s" , "index.indexing.slowlog.threshold.index.trace" : "500ms" , "index.indexing.slowlog.level" : "info" , "index.indexing.slowlog.source" : "1000" } # search数据的慢日志 查询分为fetch和query,fetch先查出来id和score,再根据id查找全文,根据score排序。就像数据库的回表一样。索引fetch会较慢一些 PUT /you index name/_settings{ "index.search.slowlog.threshold.query.warn" : "10s" , "index.search.slowlog.threshold.query.info" : "5s" , "index.search.slowlog.threshold.query.debug" : "2s" , "index.search.slowlog.threshold.query.trace" : "500ms" , "index.search.slowlog.threshold.fetch.warn" : "1s" , "index.search.slowlog.threshold.fetch.info" : "800ms" , "index.search.slowlog.threshold.fetch.debug" : "500ms" , "index.search.slowlog.threshold.fetch.trace" : "200ms" , "index.search.slowlog.level" : "info" } # 查看慢日志所在位置 GET _nodes/settings?pretty=true # 可以通过logstash将慢查询日志收集进ES中,然后进行分析 # 注:docker安装ES,会出现日志只输出控制台,不写入文件的情况,请自定义配置log4j2.properties文件
7.ES索引新增字段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 PUT user_order_2010(表名)/_mapping { "properties" :{ //新增字段combineOrderNumber "combineOrderNumber" : { "type" : "keyword" } } } curl -X PUT http://127.0.0.1:9200/user_order_2010/_mapping -H 'Content-Type: application/json' -d '{ "properties":{ //新增字段combineOrderNumber "combineOrderNumber": { "type": "keyword" } } }'
8.Docker安装ES及相关技术栈 注:如果是wsl中部署,存在ip地址重启之后变动的情况请使用网桥模式,将相关组件全使用一个网桥,通过docker 容器名称连接。
1 2 3 4 5 6 7 8 9 10 11 12 docker pull elasticsearch:7.6.1 docker pull kibana:7.6.1mkdir -p /mydata/elasticsearch/configmkdir -p /mydata/elasticsearch/dataecho "http.host: 0.0.0.0" >/mydata/elasticsearch/config/elasticsearch.ymlmkdir -p /mydata/elasticsearch/kibana/config/ vim /mydata/elasticsearch/kibana/config/kibana.ymlchmod -R 777 /mydata/elasticsearch/configchmod -R 777 /mydata/elasticsearch/datachmod -R 777 /mydata/elasticsearch/kibana/config/
1 2 3 4 5 6 7 8 9 server.name: kibana server.host: "0" elasticsearch.hosts: [ "http://192.168.170.132:9200" ] xpack.monitoring.ui.container.elasticsearch.enabled: true i18n.locale: "zh-CN" elasticsearch.username: "elastic" elasticsearch.password: "passwd"
1 2 3 4 5 6 7 8 9 10 11 12 http.host: 0.0 .0 .0 cluster.name: "docker-cluster" network.hosts: 0.0 .0 .0 http.cors.allow-origin: "*" http.cors.enabled: true http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type xpack.security.enabled: true xpack.license.self_generated.type: basic xpack.security.transport.ssl.enabled: true
1 2 3 4 5 docker run --name elasticsearch -p 9200 :9200 -p 9300 :9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx512m" -v /mydata/ elasticsearch/config/ elasticsearch.yml:/usr/ share/elasticsearch/ config/elasticsearch.yml -v /my data/elasticsearch/ data:/usr/ share/elasticsearch/ data -v /mydata/ elasticsearch/plugins:/u sr/share/ elasticsearch/plugins -d elasticsearch:7.6 .1 docker run -d --name=kibana --restart=always -p 5601 :5601 -v /mydata/ elasticsearch/kibana/ config/kibana.yml:/u sr/share/ kibana/config/ kibana.yml kibana:7.6 .1
docker 部署metricbea :
1 2 docker pull docker.elastic.co/beats/metricbeat:7.10.1 vim /mydata/elasticsearch/metricbeat/metricbeat.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 metricbeat.config: modules: path: ${path.config}/modules.d/*.yml reload.enabled: false metricbeat.autodiscover: providers: - type: docker hints.enabled: true metricbeat.modules: - module: elasticsearch xpack.enabled: true period: 10s hosts: ["http://elasticsearch:9200" ] - module: docker metricsets: - "container" - "cpu" - "diskio" - "healthcheck" - "info" - "memory" - "network" hosts: ["unix:///var/run/docker.sock" ] period: 10s enabled: true processors: - add_cloud_metadata: ~ output.elasticsearch: hosts: ["elasticsearch:9200" ] setup.kibana.host: "kibana:5601" setup.dashboards.enabled: true setup.template.overwrite: false
1 2 3 4 5 6 7 docker run -d --network =es-network --name =metricbeat \--user =root --volume ="$(pwd) /metricbeat.yml:/usr/share/metricbeat/metricbeat.yml:ro" \--volume ="/var/run/docker.sock:/var/run/docker.sock:ro" \--volume ="/sys/fs/cgroup:/hostfs/sys/fs/cgroup:ro" \--volume ="/proc:/hostfs/proc:ro" --volume ="/:/hostfs:ro" \ docker.elastic.co/beats/metricbeat:7.10.1 metricbeat -e
9.ES修复log4j2 bug
关闭ES集群
备份 elasticsearch 目录下lib下的 log4j-api-2.11.1.jar和log4j-core-2.11.1.jar
cd /xxx/xxxx/elasticsearch/lib
mkdir log_bak
mv log4j* ./log_bak
上传log4j-core-2.17.0.jar 和 log4j-api-2.17.0.jar 到 lib目录下
启动ES集群
10.ES监控dashboard 1.python收集脚本如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 import datetimeimport timeimport jsonimport urllib2import osimport sys elasticServer = 'http://192.168.2.13:9200' interval = 60 elasticIndex = 'elasticsearch_metrics' elasticMonitoringCluster = 'http://192.168.2.13:9200' read_es_security_enable = False read_username = "read_username" read_password = "read_password" write_es_security_enable = False write_username = "write_username" write_password = "write_password" def handle_urlopen (urlData, read_username, read_password ): if read_es_security_enable: try : password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None , urlData, read_username, read_password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) opener = urllib2.build_opener(handler) urllib2.install_opener(opener) response = urllib2.urlopen(urlData) return response except Exception as e: print "Error: {0}" .format (str (e)) else : try : response = urllib2.urlopen(urlData) return response except Exception as e: print "Error: {0}" .format (str (e))def fetch_clusterhealth (): try : utc_datetime = datetime.datetime.utcnow() endpoint = "/_cluster/health" urlData = elasticServer + endpoint response = handle_urlopen(urlData,read_username,read_password) jsonData = json.loads(response.read()) clusterName = jsonData['cluster_name' ] jsonData['@timestamp' ] = str (utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f' )[:-3 ]) if jsonData['status' ] == 'green' : jsonData['status_code' ] = 0 elif jsonData['status' ] == 'yellow' : jsonData['status_code' ] = 1 elif jsonData['status' ] == 'red' : jsonData['status_code' ] = 2 post_data(jsonData) return clusterName except IOError as err: print "IOError: Maybe can't connect to elasticsearch." clusterName = "unknown" return clusterNamedef fetch_clusterstats (): utc_datetime = datetime.datetime.utcnow() endpoint = "/_cluster/stats" urlData = elasticServer + endpoint response = handle_urlopen(urlData,read_username,read_password) jsonData = json.loads(response.read()) jsonData['@timestamp' ] = str (utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f' )[:-3 ]) post_data(jsonData)def fetch_nodestats (clusterName ): utc_datetime = datetime.datetime.utcnow() endpoint = "/_cat/nodes?v&h=n" urlData = elasticServer + endpoint response = handle_urlopen(urlData,read_username,read_password) nodes = response.read()[1 :-1 ].strip().split('\n' ) for node in nodes: endpoint = "/_nodes/%s/stats" % node.rstrip() urlData = elasticServer + endpoint response = handle_urlopen(urlData,read_username,read_password) jsonData = json.loads(response.read()) nodeID = jsonData['nodes' ].keys() try : jsonData['nodes' ][nodeID[0 ]]['@timestamp' ] = str (utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f' )[:-3 ]) jsonData['nodes' ][nodeID[0 ]]['cluster_name' ] = clusterName newJsonData = jsonData['nodes' ][nodeID[0 ]] post_data(newJsonData) except : continue def fetch_indexstats (clusterName ): utc_datetime = datetime.datetime.utcnow() endpoint = "/_stats" urlData = elasticServer + endpoint response = handle_urlopen(urlData,read_username,read_password) jsonData = json.loads(response.read()) jsonData['_all' ]['@timestamp' ] = str (utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f' )[:-3 ]) jsonData['_all' ]['cluster_name' ] = clusterName post_data(jsonData['_all' ])def post_data (data ): utc_datetime = datetime.datetime.utcnow() url_parameters = {'cluster' : elasticMonitoringCluster, 'index' : elasticIndex, 'index_period' : utc_datetime.strftime("%Y.%m.%d" ), } url = "%(cluster)s/%(index)s-%(index_period)s/message" % url_parameters headers = {'content-type' : 'application/json' } try : req = urllib2.Request(url, headers=headers, data=json.dumps(data)) if write_es_security_enable: password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None , url, write_username, write_password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) opener = urllib2.build_opener(handler) urllib2.install_opener(opener) response = urllib2.urlopen(req) else : response = urllib2.urlopen(req) except Exception as e: print "Error: {0}" .format (str (e))def main (): clusterName = fetch_clusterhealth() print (clusterName) if clusterName != "unknown" : fetch_clusterstats() fetch_nodestats(clusterName) fetch_indexstats(clusterName)if __name__ == '__main__' : try : nextRun = 0 while True : if time.time() >= nextRun: nextRun = time.time() + interval now = time.time() main() elapsed = time.time() - now print "Total Elapsed Time: %s" % elapsed timeDiff = nextRun - time.time() if timeDiff >= 0 : time.sleep(timeDiff) except KeyboardInterrupt: print 'Interrupted' try : sys.exit(0 ) except SystemExit: os._exit(0 )
2.服务启动脚本
1 2 # !/bin/bash nohup python2 ./elasticsearch2elastic.py > ./elastic.log 2>&1 &
3.服务停止脚本
1 2 3 4 5 6 7 8 9 10 11 12 # ! /bin/shell appName='elasticsearch2elastic.py' pid=$(ps -ef | grep ${appName} | grep -v grep | awk '{print $2}') echo -e $pid kill -9 ${pid} sleep 2 if [ $? -eq 0 ];then echo "kill ${appName} success..." else echo "kill ${appName} fail" fi
4.索引清理脚本-shell
1 2 3 4 5 6 7 8 9 10 # !/bin/bash ES_IP=10.0.0.73 ES_PORT=9200 INDEX_PREFIX='elasticsearch_metrics-' INDEX_SUFF='*' DATE_MONTH=`date -d "1 month ago" +%Y.%m` INDEX_NAME=${INDEX_PREFIX}${DATE_MONTH}${INDEX_SUFF} ES_URL=http://${ES_IP}:${ES_PORT}/${INDEX_NAME} LOG_FILE=esindex_del-${DATE_MONTH}.out curl -XDELETE ${ES_URL} >> ${LOG_FILE}
5.索引清理脚本-python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 """ @Project :es-monitor @File :index_clear.py @IDE :PyCharm @Author :Suaf """ import datetimeimport os ES_PORT = '9200' ES_IP = '10.0.0.73' INDEX_PREFIX = 'elasticsearch_metrics-' INDEX_SUFF = '*' def getLastMonth (): today = datetime.date.today() first = today.replace(day=1 ) last_month = first - datetime.timedelta(days=1 ) month = last_month.strftime("%Y.%m" ) index_name = INDEX_PREFIX + month + INDEX_SUFF log_file = 'esindex_del-' + month + '.out' command = ('curl -XDELETE http://%s:%s/%s > %s' ) % (ES_IP, ES_PORT, index_name, log_file) print (command) os.system(command)if __name__ == '__main__' : getLastMonth()
11.IP转GeoIp
GeoIp processor 根据来自 Maxmind 数据库的数据添加有关IP地址地理位置的信息。默认情况下,GeoIp processor 将此信息添加到 geoip 字段下。GeoIp processor 可以解析 IPv4 和 IPv6 地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 # 创建一个预处理管道 PUT _ingest/pipeline/geoip_pipeline{ "description" : "Add geoip info" , "processors" : [ { "geoip" : { "field" : "ip" } } ] } # 创建一个索引 # 考虑到后面要批量导入数千条+数据,我们采用了取巧的方式。使用了在创建索引的时候指定缺省管道(index.default_pipeline)的方式。 # 这样的好处是: # 灵活:用户只关心 bulk 批量写入数据。 # 零写入代码修改:甚至写入数据的代码一行都不需要改就可以。 PUT ip_index{ "settings" : { "index.default_pipeline" : "geoip_pipeline" } , "mappings" : { "properties" : { "geoip" : { "properties" : { "location" : { "type" : "geo_point" } } } , "ip" : { "type" : "keyword" } } } } # 写入一条数据 PUT ip_index/_doc/1 { "ip" : "8.8.8.8" }
12.ES存储深入了解 ES配置的目录:
path.home:运行Elasticsearch进程的用户的主目录。默认为Java系统属性user.dir,它是进程所有者的默认主目录
path.conf:包含配置文件的目录。这通常通过设置Java系统属性es.config来设置,因为在找到配置文件之前它必然会被解析
path.plugins:子文件夹为Elasticsearch插件的目录
path.logs:存储生成的日志的位置。如果其中一个卷的磁盘空间不足,则将它放在与数据目录不同的卷上可能是有意义的
path.data:包含Elasticsearch存储的数据的文件夹的路径(只看他)
文件从哪里来:
Lucene负责写和维护Lucene索引文件
而Elasticsearch在Lucene之上写与功能相关的元数据,例如字段映射,索引设置和其他集群元数据。 最终用户和支持功能
在低级Lucene中不存在,由Elasticsearch提供
文件结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 data |__ elasticsearch |__ nodes |__ 0 |__ _state | |__ global-0. st # 集群的全局元数据 |__ node.lock # 文件用于确保一次只能从一个数据目录读取/写入一个Elasticsearch相关安装信息 |__ indices |__ 45 aonVA8RPSlHSKGnK_yaA |__ 0 # 0 包含与索引的第一个(也是唯一的)分片相关的数据(分片0 ) | |__ _state | |__ index # | |__ translog # 每个分片的 事务日志, 保可以安全地将数据索引到Elasticsearch |__ _state # 前者_state包含所谓的索引状态文件,存储索引的元数据 data/ └── nodes └── 0 ├── _state # 集群的全局元数据 ├── indices # 存储索引的目录 │ ├── 45 aonVA8RPSlHSKGnK_yaA # 索引唯一标识 │ │ ├── 0 # 0 包含与索引的第一个(也是唯一的)分片相关的数据(分片0 ) │ │ │ ├── _state # _state包含所谓的索引状态文件,存储索引的元数据 │ │ │ │ ├── retention-leases-41. st │ │ │ │ └── state-1. st │ │ │ ├── index # 索引数据 │ │ │ │ ├── _0.cfe │ │ │ │ ├── _0.cfs │ │ │ │ ├── _0.si │ │ │ │ ├── segments_3 │ │ │ │ └── write.lock │ │ │ └── translog # 每个分片的 事务日志, 保可以安全地将数据索引到Elasticsearch │ │ │ ├── translog-4. tlog │ │ │ └── translog.ckp │ │ └── _state # 索引的元数据,它的创建时间戳,它还包含唯一标识符以及索引的设置和映射 │ │ └── state-5. s └── node.lock # 文件用于确保一次只能从一个数据目录读取/写入一个Elasticsearch相关安装信息
13.ES查询出现数据量倒退问题 两次查询数据量不一致,因为ES查询为了降低查询压力默认会轮询查询主分片与副本分片,但是主副本之间同步会存在同步时差,所以限制只查询主分片就会避免该问题出现,具体解决方式如下:
1 2 3 4 5 6 7 8 9 GET /_search?preference=_primary # preference查询偏好设置{ "query" : { "match" : { "title" : "elasticsearch" } } } # searchRequest.preference("_primary" );
_primary: 指查询只在主分片中查询
_primary_first: 指查询会先在主分片中查询,如果主分片找不到(挂了),就会在副本中查询
_local::指查询操作会优先在本地节点有的分片中查询,没有的话再在其它节点查询。
__only_node:指在指定id的节点里面进行查询,如果该节点只有要查询索引的部分分片,就只在这部分分片中查找,所以查询结果可能不完整。如_only_node:123在节点id为123的节点中查询。
_prefer_node:nodeid 优先在指定的节点上执行查询
_shards:0,1,2,3,4:查询指定分片的数据
Custom (string) value:用户自定义值,指在参数cluster.routing.allocation.awareness.attributes指定的值,如这个值设置为了zone,那么preference=zone的话就在awareness.attributes=zone*这样的节点搜索,如zone1、zone2
14.基于拼音和中文进行搜索 一般情况下,有些搜索需求是需要根据拼音和中文来搜索的,那么在elasticsearch中是如何来实现基于拼音来搜索的,可以通过elasticsearch-analysis-pinyin分析器来实现,那我们的需求:自定义一个分词器,即可以实现拼音搜索,也可以实现中文搜索。参考这里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 # 进入 es 的插件目录 cd /Users/suaofeng/Downloads/docker/elasticsearch/plugins# 下载 wget https://github.com/medcl/elasticsearch-analysis-pinyin/releases/tag/v7.10.1# 新建目录 mkdir analysis-pinyin# 解压 mv elasticsearch-analysis-pinyin-7.10.1.zip analysis-pinyin && cd analysis-pinyin && unzip elasticsearch-analysis-pinyin-7.10.1.zip && rm -rvf elasticsearch-analysis-pinyin-7.10.1.zip cd ../ && chown -R es:es analysis-pinyin# 启动es并测试,结果发现实现了拼音分词,但是这个不一定满足我们的需求, # 比如没有中文了,单个的拼音(比如:wo)是没有什么用的,需要对拼音分词器进行定制化。 GET _analyze { "text": ["我是程序员"] , "analyzer": "pinyin" } { "tokens" : [ { "token" : "wo", "start_offset" : 0, "end_offset" : 0, "type" : "word", "position" : 0 }, { "token" : "wscxy", "start_offset" : 0, "end_offset" : 0, "type" : "word", "position" : 0 }, { "token" : "shi", "start_offset" : 0, "end_offset" : 0, "type" : "word", "position" : 1 }, { "token" : "cheng", "start_offset" : 0, "end_offset" : 0, "type" : "word", "position" : 2 }, { "token" : "xu", "start_offset" : 0, "end_offset" : 0, "type" : "word", "position" : 3 }, { "token" : "yuan", "start_offset" : 0, "end_offset" : 0, "type" : "word", "position" : 4 } ] }
14.2.自定义分词器 再次分析es分词的三个过程
character filters: 用于在tokenizer之前对文本进行处理。比如:删除字符,替换字符等。
tokenizer: 将文本按照一定的规则分成独立的token。即实现分词功能。
tokenizer filter: 将tokenizer输出的词条做进一步的处理。比如: 同义词处理,大小写转换、移除停用词,拼音处理等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 PUT /test_pinyin{ "settings" : { "analysis" : { "analyzer" : { "custom_analyzer" : { "tokenizer" : "ik_max_word" , "filter" : "custom_pinyin" } } , "filter" : { "custom_pinyin" : { "type" : "pinyin" , "keep_full_pinyin" : false , "keep_joined_full_pinyin" : true , "keep_original" : true , "limit_first_letter_length" : 16 , "remove_duplicated_term" : true , "none_chinese_pinyin_tokenize" : false } } } } , "mappings" : { "properties" : { "name" : { "type" : "text" , "analyzer" : "custom_analyzer" , "search_analyzer" : "ik_smart" } } } }
注意: 可以看到 我们的 name字段 使用的分词器是 custom_analyzer,这个是我们在上一步定义的。但是搜索的时候使用的是 ik_smart,这个为甚么会这样呢? 假设我们存在如下2个文本 科技强国和 这是一架客机, 那么科技和客机的拼音是不是就是一样的。 这个时候如果搜索时使用的分词器也是custom_analyzer那么,搜索科技的时候客机也会搜索出来,这样是不对的。因此在搜索的时候中文就以中文搜,拼音就以拼音搜。当 analyzer和search_analyzer的值都是custom_analyzer,搜索时也会通过拼音搜索,这样的结果可能就不是我们想要的。
关于拼音分词器的一些可选配置: The plugin includes analyzer: pinyin , tokenizer: pinyin and token-filter: pinyin.
keep_first_letter when this option enabled, eg: 刘德华>ldh, default: true
keep_separate_first_letter when this option enabled, will keep first letters separately, eg: 刘德华>l,d,h, default: false, NOTE: query result maybe too fuzziness due to term too frequency
limit_first_letter_length set max length of the first_letter result, default: 16
keep_full_pinyin when this option enabled, eg: 刘德华> [liu,de,hua], default: true
keep_joined_full_pinyin when this option enabled, eg: 刘德华> [liudehua], default: false
keep_none_chinese keep non chinese letter or number in result, default: true
keep_none_chinese_together keep non chinese letter together, default: true, eg: DJ音乐家 -> DJ,yin,yue,jia, when set to false, eg: DJ音乐家 -> D,J,yin,yue,jia, NOTE: keep_none_chinese should be enabled first
keep_none_chinese_in_first_letter keep non Chinese letters in first letter, eg: 刘德华AT2016->ldhat2016, default: true
keep_none_chinese_in_joined_full_pinyin keep non Chinese letters in joined full pinyin, eg: 刘德华2016->liudehua2016, default: false
none_chinese_pinyin_tokenize break non chinese letters into separate pinyin term if they are pinyin, default: true, eg: liudehuaalibaba13zhuanghan -> liu,de,hua,a,li,ba,ba,13,zhuang,han, NOTE: keep_none_chinese and keep_none_chinese_together should be enabled first
keep_original when this option enabled, will keep original input as well, default: false
lowercase lowercase non Chinese letters, default: true
trim_whitespace default: true
remove_duplicated_term when this option enabled, duplicated term will be removed to save index, eg: de的>de, default: false, NOTE: position related query maybe influenced
ignore_pinyin_offset after 6.0, offset is strictly constrained, overlapped tokens are not allowed, with this parameter, overlapped token will allowed by ignore offset, please note, all position related query or highlight will become incorrect, you should use multi fields and specify different settings for different query purpose. if you need offset, please set it to false. default: true.
14.3.插入数据并测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #插入数据 PUT /test_pinyin/_bulk{ "index" : { "_id" : 1 } } { "name" : "科技强国" } { "index" : { "_id" : 2 } } { "name" : "这是一架客机" } { "index" : { "_id" : 3 } } # 中文query GET test_pinyin/_search{ "query" : { "match" : { "name" : "科技" } } } # 拼音query GET test_pinyin/_search{ "query" : { "match" : { "name" : "keji" } } }
15.数据迁移 elasticsearch-dump 备份文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 不一定要a集群到b集群,到文件到s3都行,具体看文档说明 docker run --network host --rm -ti -v /data:/tmp elasticdump/elasticsearch-dump \ --input=http://elastic:law2022@192.168.0.22:9200/law_event_info@1665926463 \ --output=https://elastic:law2022@lvfa-v.com:8085/law_event_info@1665926463 \ --type=analyzer # 开启index POST /law_event_info@1665926463/_open docker run --network host --rm -ti -v /data:/tmp elasticdump/elasticsearch-dump \ --input=http://elastic:law2022@192.168.0.22:9200/law_event_info@1665926463 \ --output=https://elastic:law2022@lvfa-v.com:8085/law_event_info@1665926463 \ --type=mapping docker run --network host --rm -ti -v /data:/tmp elasticdump/elasticsearch-dump \ --input=http://elastic:law2022@192.168.0.22:9200/law_event_info@1665926463 \ --output=https://elastic:law2022@lvfa-v.com:8085/law_event_info@1665926463 \ --type=data
spring boot data elasticsearch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @Data @AllArgsConstructor @NoArgsConstructor @Accessors(chain = true) @Document(indexName = "law_forum_theme") @Setting(shards = 2, refreshInterval = "10s") public class ThemeEntity implements Serializable { @Id private Long id; @MultiField( mainField = @Field(type = FieldType.Text, fielddata = true), otherFields = { @InnerField(suffix = "keyword",type = FieldType.Keyword) } ) private String title; @CompletionField(analyzer="ik_max_word",searchAnalyzer="ik_smart", maxInputLength = 20) private Completion titleSuggest; @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") private String description; @Field(type = FieldType.Integer) private Integer status; @Field(type = FieldType.Integer) private Integer isDelete; @GeoPointField private GeoPoint location; @Field(type = FieldType.Double) private double longitude; @Field(type = FieldType.Double) private double latitude; @GeoShapeField private GeoJson landmark; @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis) @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private LocalDateTime createTime; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Repository public interface ThemeEntityRepository extends ElasticsearchRepository <ThemeEntity, Long> { @Highlight(fields = { @HighlightField(name = "content"), @HighlightField(name = "description"), @HighlightField(name = "title"), }, parameters = @HighlightParameters( preTags = "<span style='color:red'>", postTags = "</strong>" )) List<SearchHit<ThemeEntity>> findThemeEntityByDescriptionOrTitle (String describe, String title) ; @Highlight(fields = { @HighlightField(name = "content"), @HighlightField(name = "description"), @HighlightField(name = "title"), }, parameters = @HighlightParameters( preTags = "<span style='color:red'>", postTags = "</strong>" )) Page<ThemeEntity> findThemeEntityByDescriptionOrTitle (String describe, String title, Pageable pageable) ; @Highlight(fields = { @HighlightField(name = "content"), @HighlightField(name = "description"), @HighlightField(name = "title"), }, parameters = @HighlightParameters( preTags = "<span style='color:red'>", postTags = "</strong>" )) List<ThemeEntity> findThemeEntityByDescriptionOrTitle (String describe, String title, Sort sort) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Test @SneakyThrows public void testCompletion () { ThemeInfoEntity themeInfoEntity = new ThemeInfoEntity (); themeInfoEntity.setId(1L ); themeInfoEntity.setTitle("这是一个有意思的标题" ); Completion completion = new Completion (Lists.newArrayList("这是一个有意思的标题" )); themeInfoEntity.setTitleSuggest(completion); themeInfoEntityRepository.save(themeInfoEntity); SuggestBuilder suggestBuilder = new SuggestBuilder ().addSuggestion("simple_suggest" , SuggestBuilders.completionSuggestion("titleSuggest" ) .prefix("这是" ) .analyzer("ik_max_word" ) .size(3 ) ); SearchResponse suggestResp = elasticsearchOperations.suggest(suggestBuilder, ThemeInfoEntity.class); Suggest.Suggestion<? extends Suggest .Suggestion.Entry<? extends Suggest .Suggestion.Entry.Option>> completionSuggest = suggestResp.getSuggest().getSuggestion("simple_suggest" ); List<? extends Suggest .Suggestion.Entry<? extends Suggest .Suggestion.Entry.Option>> entries = completionSuggest.getEntries(); List<Object> collect = entries.stream().map(entry -> { if (!entry.getOptions().isEmpty()) { Text text = entry.getOptions().get(0 ).getText(); return text.string(); } return "" ; }).collect(Collectors.toList()); collect.forEach(System.out::println); }@Test @SneakyThrows public void testSuggestTerm () { SuggestBuilder suggestBuilder = new SuggestBuilder ().addSuggestion("errorCorrection" , SuggestBuilders.termSuggestion("description" ) .text("The suggest feature suggestts" ) .suggestMode(TermSuggestionBuilder.SuggestMode.MISSING) .analyzer("ik_max_word" ) .size(3 ).sort(SortBy.SCORE) ); SearchResponse suggestResp = elasticsearchOperations.suggest(suggestBuilder, ThemeEntity.class); Suggest.Suggestion<? extends Suggest .Suggestion.Entry<? extends Suggest .Suggestion.Entry.Option>> errorCorrection = suggestResp.getSuggest().getSuggestion("errorCorrection" ); List<? extends Suggest .Suggestion.Entry<? extends Suggest .Suggestion.Entry.Option>> entries = errorCorrection.getEntries(); List<Object> collect = entries.stream().map(entry -> { if (!entry.getOptions().isEmpty()) { Text text = entry.getOptions().get(0 ).getText(); return text.string(); } return "" ; }).collect(Collectors.toList()); collect.forEach(System.out::println); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 @Test public void query () { Pageable pageInfo = PageRequest.of(1 - 1 , 10 ); GeoPoint geoPoint = new GeoPoint (39.927 , 116.322 ); NativeSearchQuery searchQuery = getSearchQuery(geoPoint, pageInfo); SearchHits<ThemeEntity> themeEntitySearchHits = elasticsearchOperations.search(searchQuery, ThemeEntity.class); List<Integer> nearbyStoreList = new ArrayList <>(); themeEntitySearchHits.forEach(hitTheme -> { ThemeEntity theme = hitTheme.getContent(); int distance = 0 ; if (!StringUtils.isEmpty(geoPoint.getLat() + "" ) && !StringUtils.isEmpty(geoPoint.getLon() + "" )) { distance = (int ) LocationUtils.getDistance(geoPoint, theme.getLocation()); } nearbyStoreList.add(distance); } ); nearbyStoreList.forEach(System.out::println); } @Test public void queryShape () { Pageable pageInfo = PageRequest.of(1 - 1 , 10 ); GeoPoint geoPoint1 = new GeoPoint (39.927 , 116.322 ); GeoPoint geoPoint2 = new GeoPoint (39.227 , 116.722 ); NativeSearchQuery searchQuery = getSearchQuery(geoPoint1, geoPoint2, pageInfo); SearchHits<ThemeEntity> themeEntitySearchHits = elasticsearchOperations.search(searchQuery, ThemeEntity.class); themeEntitySearchHits.forEach(hitTheme -> { ThemeEntity theme = hitTheme.getContent(); System.out.println(theme); } ); } private NativeSearchQuery getSearchQuery (GeoPoint locationModel, Pageable pageInfo) { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); boolQuery.must(QueryBuilders.existsQuery("latitude" )); boolQuery.must(QueryBuilders.existsQuery("longitude" )); NativeSearchQueryBuilder nativeBuilder = new NativeSearchQueryBuilder (); if (!StringUtils.isEmpty(locationModel.getLat() + "" ) && !StringUtils.isEmpty(locationModel.getLon() + "" )) { getLocation(locationModel, boolQuery, nativeBuilder); } if (StringUtils.isEmpty(locationModel.getLat() + "" ) || StringUtils.isEmpty(locationModel.getLon() + "" )) { nativeBuilder.withSort(SortBuilders.fieldSort("latitude" ).order(SortOrder.DESC)); } return nativeBuilder .withQuery(boolQuery) .withPageable(pageInfo) .build(); } private NativeSearchQuery getSearchQuery (GeoPoint locationModel1, GeoPoint locationModel2, Pageable pageInfo) { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); boolQuery.must(QueryBuilders.existsQuery("latitude" )); boolQuery.must(QueryBuilders.existsQuery("longitude" )); NativeSearchQueryBuilder nativeBuilder = new NativeSearchQueryBuilder (); if (!StringUtils.isEmpty(locationModel1.getLat() + "" ) && !StringUtils.isEmpty(locationModel2.getLon() + "" )) { getShape(locationModel1, locationModel2, boolQuery, nativeBuilder); } if (StringUtils.isEmpty(locationModel1.getLat() + "" ) || StringUtils.isEmpty(locationModel1.getLon() + "" )) { nativeBuilder.withSort(SortBuilders.fieldSort("latitude" ).order(SortOrder.DESC)); } return nativeBuilder .withQuery(boolQuery) .withPageable(pageInfo) .build(); } private void getLocation (GeoPoint geoPoint, BoolQueryBuilder queryBool, NativeSearchQueryBuilder searchQueryBuilder) { GeoDistanceQueryBuilder distanceQueryBuilder = new GeoDistanceQueryBuilder ("location" ); distanceQueryBuilder.point(geoPoint.getLat(), geoPoint.getLon()); distanceQueryBuilder.distance(10 , DistanceUnit.KILOMETERS); queryBool.filter(distanceQueryBuilder); GeoDistanceSortBuilder distanceSortBuilder = new GeoDistanceSortBuilder ("location" , geoPoint.getLat(), geoPoint.getLon()); distanceSortBuilder.unit(DistanceUnit.KILOMETERS); distanceSortBuilder.order(SortOrder.ASC); searchQueryBuilder.withSort(distanceSortBuilder); }