Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 文章 Shengkai Fang
不好意思,在es6上也进行了相应的修复。
但似乎是一个相同的问题。

Shengkai Fang  于2020年8月7日周五 下午7:52写道:

> 你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
> 似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。
> 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2
>
> 费文杰  于2020年8月7日周五 下午3:56写道:
>
>>
>> 以下是我的代码:
>> import com.alibaba.fastjson.JSONObject;
>> import lombok.extern.slf4j.Slf4j;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
>> import
>> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
>> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
>> import org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.index.IndexRequest;
>> import org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.Requests;
>> import org.apache.http.HttpHost;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>> import java.util.*;
>> @Slf4j
>> public class TrackToEsJob {
>> public static void main(String[] args) throws Exception {
>> // 获取执行环境
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> // 设置并发数,一般跟机器核数保持一致
>> env.setParallelism(1);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> final Properties kafkaProps = new Properties();
>> kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>> 192.168.1.100:9092");
>> kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> "track-flink-group");
>> kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> "false");
>> FlinkKafkaConsumer flinkKafkaConsumer = new
>> FlinkKafkaConsumer("bi-track-log-client",
>> new SimpleStringSchema(), kafkaProps);
>> // 默认从最近开始消费
>> flinkKafkaConsumer.setStartFromLatest();
>> // 1、kafka来源stream,使用跟分区数量一致的并行度
>> int partitionCount = 1;
>> DataStream sourceStream =
>> env.addSource(flinkKafkaConsumer)
>> .setParallelism(1)
>> .name("source_kafka_trackTopics")
>> .uid("source_kafka_trackTopics");
>> List httpHosts = new ArrayList<>();
>> httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
>> ElasticsearchSink.Builder esSinkBuilder = new
>> ElasticsearchSink.Builder(httpHosts, new
>> ElasticsearchSinkFunction() {
>>   public IndexRequest createIndexRequest(String o) {
>> JSONObject jsonObject = JSONObject.parseObject(o);
>> System.out.println("saving data"
>> +jsonObject.toJSONString());
>> Map esData = new HashMap<>();
>> esData.put("appId",jsonObject.getString("appId"));
>> esData.put("indexKey",jsonObject.getString("indexKey"));
>>
>> esData.put("indexValue",jsonObject.getString("indexValue"));
>> return Requests.indexRequest()
>> .index("bi_track_log_es")
>> .type("doc")
>> .source(esData);
>> }
>> @Override
>> public void process(String o, RuntimeContext runtimeContext,
>> RequestIndexer requestIndexer) {
>> requestIndexer.add(createIndexRequest(o));
>> }
>> });
>> esSinkBuilder.setBulkFlushMaxActions(1);
>> sourceStream.addSink(esSinkBuilder.build());
>> env.execute("TrackToEsJob");
>> }我认为flink-sql-connector-elasticsearch6
>> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!
>>
>> 以下为报错信息:
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>> Cannot instantiate user function.
>> at
>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: un

Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 文章 Shengkai Fang
你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。
参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2

费文杰  于2020年8月7日周五 下午3:56写道:

>
> 以下是我的代码:
> import com.alibaba.fastjson.JSONObject;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.RuntimeContext;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
> import
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
> import org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.index.IndexRequest;
> import org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.Requests;
> import org.apache.http.HttpHost;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import java.util.*;
> @Slf4j
> public class TrackToEsJob {
> public static void main(String[] args) throws Exception {
> // 获取执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置并发数,一般跟机器核数保持一致
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final Properties kafkaProps = new Properties();
> kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.1.100:9092");
> kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "track-flink-group");
> kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "false");
> FlinkKafkaConsumer flinkKafkaConsumer = new
> FlinkKafkaConsumer("bi-track-log-client",
> new SimpleStringSchema(), kafkaProps);
> // 默认从最近开始消费
> flinkKafkaConsumer.setStartFromLatest();
> // 1、kafka来源stream,使用跟分区数量一致的并行度
> int partitionCount = 1;
> DataStream sourceStream = env.addSource(flinkKafkaConsumer)
> .setParallelism(1)
> .name("source_kafka_trackTopics")
> .uid("source_kafka_trackTopics");
> List httpHosts = new ArrayList<>();
> httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
> ElasticsearchSink.Builder esSinkBuilder = new
> ElasticsearchSink.Builder(httpHosts, new
> ElasticsearchSinkFunction() {
>   public IndexRequest createIndexRequest(String o) {
> JSONObject jsonObject = JSONObject.parseObject(o);
> System.out.println("saving data"
> +jsonObject.toJSONString());
> Map esData = new HashMap<>();
> esData.put("appId",jsonObject.getString("appId"));
> esData.put("indexKey",jsonObject.getString("indexKey"));
>
> esData.put("indexValue",jsonObject.getString("indexValue"));
> return Requests.indexRequest()
> .index("bi_track_log_es")
> .type("doc")
> .source(esData);
> }
> @Override
> public void process(String o, RuntimeContext runtimeContext,
> RequestIndexer requestIndexer) {
> requestIndexer.add(createIndexRequest(o));
> }
> });
> esSinkBuilder.setBulkFlushMaxActions(1);
> sourceStream.addSink(esSinkBuilder.build());
> env.execute("TrackToEsJob");
> }我认为flink-sql-connector-elasticsearch6
> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!
>
> 以下为报错信息:
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> at