👍👍👍 oliver yunchang <zzh...@foxmail.com> 于2020年4月23日周四 上午12:32写道:
> 非常感谢Leonard Xu和zhisheng的回复 > > > es index 的 mapping 是否提前设置好了? > 提前设置好了,提前创建索引的mapping如下: > { > "xxx-2020.04.23": { > "mappings": { > "doc": { > "dynamic_templates": [ > { > "string_fields": { > "match": "*", > "match_mapping_type": "string", > "mapping": { > "type": "keyword" > } > } > } > ], > "properties": { > "cost": { > "type": "long" > }, > "result": { > "type": "keyword" > } > } > } > } > } > } > 而待写入数据的字段远不止cost和result > 查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through > the put mapping < > https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html> > API, all existing templates are overwritten.[1] > 个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常 > > 重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within > 30s异常消失了 > > [1] > https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates > < > https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates > > > Best, > Oliver yunchang > > > 2020年4月22日 下午4:47,zhisheng <zhisheng2...@gmail.com> 写道: > > > > hi, > > > > es index 的 mapping 是否提前设置好了? > > > > 我看到异常 : > > > >> failed to process cluster event (put-mapping) within 30s > > > > 像是自动建 mapping 超时了 > > > > Leonard Xu <xbjt...@gmail.com> 于2020年4月22日周三 下午4:41写道: > > > >> Hi, > >> > >> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的? > >> 可以从这方面找思路排查下看看 > >> > >> 祝好, > >> Leonard Xu > >> > >> > >> > >>> 在 2020年4月22日,16:10,Oliver <zzh...@foxmail.com> 写道: > >>> > >>> hi, > >>> > >>> > >>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程, > >>> > >> > 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。 > >>> > >>> > >>> 想咨询下这种问题应该怎么样排查和处理? > >>> > >>> > >>> flink版本:1.10 > >>> ES版本:6.x > >>> > >>> > >>> 使用jar:flink-sql-connector-elasticsearch6_2.12 > >>> > >>> > >>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀 > >>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建 > >>> > >>> > >>> ES异常如下: > >>> > >>> > >>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed > >> Elasticsearch item request: ElasticsearchException[Elasticsearch > exception > >> [type=process_cluster_event_timeout_exception, reason=failed to process > >> cluster event (put-mapping) within 30s]]org.apache.flink. > >> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: > >> Elasticsearch exception [type=process_cluster_event_timeout_exception, > >> reason=failed to process cluster event (put-mapping) within 30s] > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> .elasticsearch.client.RestClient$1.completed(RestClient.java:375) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> .elasticsearch.client.RestClient$1.completed(RestClient.java:366) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > >>> at org.apache.flink.elasticsearch6.shaded.org > >> > .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > >>> at java.lang.Thread.run(Thread.java:748) > >>> > >>> > >>> > >>> flinkSQL: > >>> CREATE TABLE source_table ( > >>> `time` VARCHAR > >>> ,`level` VARCHAR > >>> ,`thread` VARCHAR > >>> ,`class` VARCHAR > >>> ) WITH ( > >>> 'connector.type' = 'kafka', > >>> 'connector.version' = 'universal', > >>> 'connector.topic' = 'xxxx', > >>> 'connector.startup-mode' = 'latest-offset', > >>> 'connector.properties.group.id' = 'xxxx', > >>> 'connector.properties.zookeeper.connect' = 'ip:2181', > >>> 'connector.properties.bootstrap.servers' = 'ip:9092', > >>> 'format.type' = 'json', > >>> 'format.derive-schema' = 'true' > >>> ); > >>> > >>> > >>> CREATE TABLE result_table ( > >>> `time` VARCHAR > >>> ,`level` VARCHAR > >>> ,`thread` VARCHAR > >>> ,`class` VARCHAR > >>> ) WITH ( > >>> 'connector.type' = 'elasticsearch', > >>> 'connector.version' = '6', > >>> 'connector.hosts' = 'xxxx, > >>> 'connector.index' = 'xxxx-yyyy.MM.dd', > >>> 'connector.document-type' = 'doc', > >>> 'update-mode' = 'append', > >>> 'connector.bulk-flush.interval' = '1000', > >>> 'connector.bulk-flush.backoff.type' = 'exponential', > >>> 'connector.bulk-flush.backoff.max-retries' = '10', > >>> 'connector.bulk-flush.backoff.delay' = '60000', > >>> 'connector.failure-handler' = 'ignore', > >>> 'format.type' = 'json' > >>> ); > >>> > >>> > >>> INSERT INTO result_table > >>> SELECT > >>> `time`,`level`,thread,class > >>> FROM source_table > >>> WHERE `method`='xxxx'; > >> > >> > >