Hi, I used beam API to write code to read Kafka data and run with Flink, but
run to throw the following exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
load user class:
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
ClassLoader info: URL ClassLoader:
file:
'/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
code:
KafkaOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
options.setJobName("KafkaExample - WindowSize: " +
options.getWindowSize() + " seconds");
options.setStreaming(true);
options.setCheckpointingInterval(1000L);
options.setNumberOfExecutionRetries(5);
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic(KAFKA_TOPIC) // use
withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata() // PCollection<KV<String,
String>>
).apply(Values.<String> create());
// .apply(KafkaIO.<Void, String>write()
// .withBootstrapServers("localhost:9092")
// .withTopic(KAFKA_OUTPUT_TOPIC)
//
.withValueSerializer(StringSerializer.class)
// .values());
pipeline.run();//.waitUntilFinish();
How to fix it ?
Thank you??