Mapping??????template????????????
"xxx-2020.04.23": {
    "mappings": {
      "doc": {
        "dynamic_templates": [
          {
            "string_fields": {
              "match": "*",
              "match_mapping_type": "string",
              "mapping": {
                "type": "keyword"
              }
            }
          }
        ],
        "properties": {
          "cost": {
            "type": "long"
          },
          "result": {
            "type": "keyword"
          }
        }
      }
    }
  }



setting??????
"xxx-2020.04.23": {
    "settings": {
      "index": {
        "routing": {
          "allocation": {
            "total_shards_per_node": "1"
          }
        },
        "refresh_interval": "10s",
        "number_of_shards": "2",
        "provided_name": "xxx-2020.04.23",
        "creation_date": "1587509965602", -- 2020/4/22 
6:59:25
        "number_of_replicas": "0",
        "uuid": "f9OqpCmJQnyqlqTeYpt1Sg",
        "version": {
          "created": "6020499"
        }
      }
    }
  }





------------------ ???????? ------------------
??????:&nbsp;"zhisheng"<zhisheng2...@gmail.com&gt;;
????????:&nbsp;2020??4??22??(??????) ????4:47
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: ????0??????????Elasticsearch??????kafka????????



hi,

es index ?? mapping ??????????????????

?????????? :

&gt; failed to process cluster event (put-mapping) within 30s

?????????? mapping ??????

Leonard Xu <xbjt...@gmail.com&gt; ??2020??4??22?????? ????4:41??????

&gt; Hi,
&gt;
&gt; 
????????????????shard??????????????????????reallocation??relocation????????????
&gt; ????????????????????????????
&gt;
&gt; ??????
&gt; Leonard Xu
&gt;
&gt;
&gt;
&gt; &gt; ?? 2020??4??22????16:10??Oliver <zzh...@foxmail.com&gt; ??????
&gt; &gt;
&gt; &gt; hi??
&gt; &gt;
&gt; &gt;
&gt; &gt; ??????????????????flink??kafka????????ES????ETL??????
&gt; &gt;
&gt; 
??????????????????????0??????????????ES????,????????????kafka??????????????????????????kafka????????????????????????????????????????????????????
&gt; &gt;
&gt; &gt;
&gt; &gt; ??????????????????????????????????????
&gt; &gt;
&gt; &gt;
&gt; &gt; flink??????1.10
&gt; &gt; ES??????6.x&amp;nbsp;
&gt; &gt;
&gt; &gt;
&gt; &gt; ????jar??flink-sql-connector-elasticsearch6_2.12
&gt; &gt;
&gt; &gt;
&gt; &gt; 
??????????????????00:00-00:01????????????????????????????????????????????????????????????????????????????
&gt; &gt; 
????????????????????????????????????????????????0??????????????????????????????????
&gt; &gt;
&gt; &gt;
&gt; &gt; ES??????????
&gt; &gt;
&gt; &gt;
&gt; &gt; 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
&gt; Elasticsearch item request: ElasticsearchException[Elasticsearch exception
&gt; [type=process_cluster_event_timeout_exception, reason=failed to process
&gt; cluster event (put-mapping) within 30s]]org.apache.flink.
&gt; elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
&gt; Elasticsearch exception [type=process_cluster_event_timeout_exception,
&gt; reason=failed to process cluster event (put-mapping) within 30s]
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
&gt; &gt; &amp;nbsp; &amp;nbsp; at org.apache.flink.elasticsearch6.shaded.org
&gt; 
.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
&gt; &gt; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; flinkSQL:
&gt; &gt; CREATE TABLE source_table (
&gt; &gt; &amp;nbsp; `time` VARCHAR
&gt; &gt; &amp;nbsp; ,`level` VARCHAR
&gt; &gt; &amp;nbsp; ,`thread` VARCHAR
&gt; &gt; &amp;nbsp; ,`class` VARCHAR
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.type' = 'kafka',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.version' = 'universal',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.topic' = 'xxxx',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.startup-mode' = 'latest-offset',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.properties.group.id' = 'xxxx',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.properties.zookeeper.connect' = 
'ip:2181',
&gt; &gt; &amp;nbsp; &amp;nbsp;'connector.properties.bootstrap.servers' = 
'ip:9092',
&gt; &gt; &amp;nbsp; &amp;nbsp;'format.type' = 'json',
&gt; &gt; &amp;nbsp; &amp;nbsp;'format.derive-schema' = 'true'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; CREATE TABLE result_table (
&gt; &gt; &amp;nbsp; `time` VARCHAR
&gt; &gt; &amp;nbsp; ,`level` VARCHAR
&gt; &gt; &amp;nbsp; ,`thread` VARCHAR
&gt; &gt; &amp;nbsp; ,`class` VARCHAR
&gt; &gt; ) WITH (
&gt; &gt; &amp;nbsp; 'connector.type' = 'elasticsearch',
&gt; &gt; &amp;nbsp; 'connector.version' = '6',
&gt; &gt; &amp;nbsp; 'connector.hosts' = 'xxxx,
&gt; &gt; &amp;nbsp; 'connector.index' = 'xxxx-yyyy.MM.dd',
&gt; &gt; &amp;nbsp; 'connector.document-type' = 'doc',
&gt; &gt; &amp;nbsp; 'update-mode' = 'append',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.interval' = '1000',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.backoff.type' = 'exponential',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.backoff.max-retries' = '10',
&gt; &gt; &amp;nbsp; 'connector.bulk-flush.backoff.delay' = '60000',
&gt; &gt; &amp;nbsp; 'connector.failure-handler' = 'ignore',
&gt; &gt; &amp;nbsp; 'format.type' = 'json'
&gt; &gt; );
&gt; &gt;
&gt; &gt;
&gt; &gt; INSERT INTO result_table
&gt; &gt; SELECT
&gt; &gt; &amp;nbsp; &amp;nbsp; `time`,`level`,thread,class
&gt; &gt; FROM source_table
&gt; &gt; WHERE `method`='xxxx';
&gt;
&gt;

回复