Mapping??????template???????????? "xxx-2020.04.23": { "mappings": { "doc": { "dynamic_templates": [ { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "keyword" } } } ], "properties": { "cost": { "type": "long" }, "result": { "type": "keyword" } } } } }
setting?????? "xxx-2020.04.23": { "settings": { "index": { "routing": { "allocation": { "total_shards_per_node": "1" } }, "refresh_interval": "10s", "number_of_shards": "2", "provided_name": "xxx-2020.04.23", "creation_date": "1587509965602", -- 2020/4/22 6:59:25 "number_of_replicas": "0", "uuid": "f9OqpCmJQnyqlqTeYpt1Sg", "version": { "created": "6020499" } } } } ------------------ ???????? ------------------ ??????: "zhisheng"<zhisheng2...@gmail.com>; ????????: 2020??4??22??(??????) ????4:47 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: ????0??????????Elasticsearch??????kafka???????? hi, es index ?? mapping ?????????????????? ?????????? : > failed to process cluster event (put-mapping) within 30s ?????????? mapping ?????? Leonard Xu <xbjt...@gmail.com> ??2020??4??22?????? ????4:41?????? > Hi, > > ????????????????shard??????????????????????reallocation??relocation???????????? > ???????????????????????????? > > ?????? > Leonard Xu > > > > > ?? 2020??4??22????16:10??Oliver <zzh...@foxmail.com> ?????? > > > > hi?? > > > > > > ??????????????????flink??kafka????????ES????ETL?????? > > > ??????????????????????0??????????????ES????,????????????kafka??????????????????????????kafka???????????????????????????????????????????????????? > > > > > > ?????????????????????????????????????? > > > > > > flink??????1.10 > > ES??????6.x&nbsp; > > > > > > ????jar??flink-sql-connector-elasticsearch6_2.12 > > > > > > ??????????????????00:00-00:01???????????????????????????????????????????????????????????????????????????? > > ????????????????????????????????????????????????0?????????????????????????????????? > > > > > > ES?????????? > > > > > > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed > Elasticsearch item request: ElasticsearchException[Elasticsearch exception > [type=process_cluster_event_timeout_exception, reason=failed to process > cluster event (put-mapping) within 30s]]org.apache.flink. > elasticsearch6.shaded.org.elasticsearch.ElasticsearchException: > Elasticsearch exception [type=process_cluster_event_timeout_exception, > reason=failed to process cluster event (put-mapping) within 30s] > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:375) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .elasticsearch.client.RestClient$1.completed(RestClient.java:366) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > > &nbsp; &nbsp; at org.apache.flink.elasticsearch6.shaded.org > .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) > > &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > > > > > > > > flinkSQL: > > CREATE TABLE source_table ( > > &nbsp; `time` VARCHAR > > &nbsp; ,`level` VARCHAR > > &nbsp; ,`thread` VARCHAR > > &nbsp; ,`class` VARCHAR > > ) WITH ( > > &nbsp; &nbsp;'connector.type' = 'kafka', > > &nbsp; &nbsp;'connector.version' = 'universal', > > &nbsp; &nbsp;'connector.topic' = 'xxxx', > > &nbsp; &nbsp;'connector.startup-mode' = 'latest-offset', > > &nbsp; &nbsp;'connector.properties.group.id' = 'xxxx', > > &nbsp; &nbsp;'connector.properties.zookeeper.connect' = 'ip:2181', > > &nbsp; &nbsp;'connector.properties.bootstrap.servers' = 'ip:9092', > > &nbsp; &nbsp;'format.type' = 'json', > > &nbsp; &nbsp;'format.derive-schema' = 'true' > > ); > > > > > > CREATE TABLE result_table ( > > &nbsp; `time` VARCHAR > > &nbsp; ,`level` VARCHAR > > &nbsp; ,`thread` VARCHAR > > &nbsp; ,`class` VARCHAR > > ) WITH ( > > &nbsp; 'connector.type' = 'elasticsearch', > > &nbsp; 'connector.version' = '6', > > &nbsp; 'connector.hosts' = 'xxxx, > > &nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd', > > &nbsp; 'connector.document-type' = 'doc', > > &nbsp; 'update-mode' = 'append', > > &nbsp; 'connector.bulk-flush.interval' = '1000', > > &nbsp; 'connector.bulk-flush.backoff.type' = 'exponential', > > &nbsp; 'connector.bulk-flush.backoff.max-retries' = '10', > > &nbsp; 'connector.bulk-flush.backoff.delay' = '60000', > > &nbsp; 'connector.failure-handler' = 'ignore', > > &nbsp; 'format.type' = 'json' > > ); > > > > > > INSERT INTO result_table > > SELECT > > &nbsp; &nbsp; `time`,`level`,thread,class > > FROM source_table > > WHERE `method`='xxxx'; > >