hi??

??????????????????flink??kafka????????ES????ETL??????
??????????????????????0??????????????ES????,????????????kafka??????????????????????????kafka????????????????????????????????????????????????????


??????????????????????????????????????


flink??????1.10
ES??????6.x 


????jar??flink-sql-connector-elasticsearch6_2.12


??????????????????00:00-00:01????????????????????????????????????????????????????????????????????????????
????????????????????????????????????????????????0??????????????????????????????????


ES??????????


2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed Elasticsearch item 
request: ElasticsearchException[Elasticsearch exception 
[type=process_cluster_event_timeout_exception, reason=failed to process cluster 
event (put-mapping) within 
30s]]org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
 Elasticsearch exception [type=process_cluster_event_timeout_exception, 
reason=failed to process cluster event (put-mapping) within 30s]
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
    at 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at 
org.apache.flink.elasticsearch6.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
    at java.lang.Thread.run(Thread.java:748)



flinkSQL:
CREATE TABLE source_table (
  `time` VARCHAR
  ,`level` VARCHAR
  ,`thread` VARCHAR
  ,`class` VARCHAR
) WITH (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'xxxx',
   'connector.startup-mode' = 'latest-offset',
   'connector.properties.group.id' = 'xxxx',
   'connector.properties.zookeeper.connect' = 'ip:2181',
   'connector.properties.bootstrap.servers' = 'ip:9092',
   'format.type' = 'json',
   'format.derive-schema' = 'true'
);


CREATE TABLE result_table (
  `time` VARCHAR
  ,`level` VARCHAR
  ,`thread` VARCHAR
  ,`class` VARCHAR
) WITH (
  'connector.type' = 'elasticsearch',
  'connector.version' = '6',
  'connector.hosts' = 'xxxx,
  'connector.index' = 'xxxx-yyyy.MM.dd',
  'connector.document-type' = 'doc',
  'update-mode' = 'append',
  'connector.bulk-flush.interval' = '1000',
  'connector.bulk-flush.backoff.type' = 'exponential',
  'connector.bulk-flush.backoff.max-retries' = '10',
  'connector.bulk-flush.backoff.delay' = '60000',
  'connector.failure-handler' = 'ignore',
  'format.type' = 'json'
);


INSERT INTO result_table
SELECT
    `time`,`level`,thread,class
FROM source_table
WHERE `method`='xxxx';

Reply via email to