Beam 2.0.0 and flink 1.2.1
------------------ ???????? ------------------ ??????: "Aljoscha Krettek";<[email protected]>; ????????: 2017??6??12??(??????) ????5:42 ??????: "user"<[email protected]>; ????: Re: Input/Output data to kafka exception Hi, How are you bundling your program for execution? Are you, for example, building a fat-jar using Maven? How are you executing the program? Using bin/flink or by executing the program using mvn exec? Also, which Beam/Flink versions are you using? Best, Aljoscha On 12. Jun 2017, at 11:28, ???? <[email protected]> wrote: Hi, I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper ClassLoader info: URL ClassLoader: file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:745) code: KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class); options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); options.setStreaming(true); options.setCheckpointingInterval(1000L); options.setNumberOfExecutionRetries(5); options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); Pipeline pipeline = Pipeline.create(options); pipeline.apply(KafkaIO.<String, String>read() .withBootstrapServers("localhost:9092") .withTopic(KAFKA_TOPIC) // use withTopics(List<String>) to read from multiple topics. .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() // PCollection<KV<String, String>> ).apply(Values.<String> create()); // .apply(KafkaIO.<Void, String>write() // .withBootstrapServers("localhost:9092") // .withTopic(KAFKA_OUTPUT_TOPIC) // .withValueSerializer(StringSerializer.class) // .values()); pipeline.run();//.waitUntilFinish(); How to fix it ? Thank you??
