Hi, Did you also specify the “-Pflink-runner” flag while building? From your mail it seems as you’re just building with “mvn clean install -DskipTests”. You need to also specify the flag when building so that the Flink dependencies are bundled into the created fat jar.
Best, Aljoscha > On 13. Jun 2017, at 10:25, 基勇 <[email protected]> wrote: > > Beam 2.0.0 and flink 1.2.1 > > > ------------------ 原始邮件 ------------------ > 发件人: "Aljoscha Krettek";<[email protected]>; > 发送时间: 2017年6月12日(星期一) 下午5:42 > 收件人: "user"<[email protected]>; > 主题: Re: Input/Output data to kafka exception > > Hi, > > How are you bundling your program for execution? Are you, for example, > building a fat-jar using Maven? How are you executing the program? Using > bin/flink or by executing the program using mvn exec? Also, which Beam/Flink > versions are you using? > > Best, > Aljoscha >> On 12. Jun 2017, at 11:28, 基勇 <[email protected] <mailto:[email protected]>> >> wrote: >> >> Hi, >> I used beam API to write code to read Kafka data and run with Flink, but >> run to throw the following exception: >> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: >> Cannot load user class: >> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper >> ClassLoader info: URL ClassLoader: >> file: >> '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' >> (valid JAR) >> Class not resolvable through given classloader. >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) >> at java.lang.Thread.run(Thread.java:745) >> >> code: >> KafkaOptions options = >> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class); >> options.setJobName("KafkaExample - WindowSize: " + >> options.getWindowSize() + " seconds"); >> options.setStreaming(true); >> options.setCheckpointingInterval(1000L); >> options.setNumberOfExecutionRetries(5); >> options.setExecutionRetryDelay(3000L); >> options.setRunner(FlinkRunner.class); >> >> Pipeline pipeline = Pipeline.create(options); >> >> pipeline.apply(KafkaIO.<String, String>read() >> .withBootstrapServers("localhost:9092") >> .withTopic(KAFKA_TOPIC) // use >> withTopics(List<String>) to read from multiple topics. >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class) >> .withoutMetadata() // PCollection<KV<String, >> String>> >> ).apply(Values.<String> create()); >> // .apply(KafkaIO.<Void, String>write() >> // .withBootstrapServers("localhost:9092") >> // .withTopic(KAFKA_OUTPUT_TOPIC) >> // >> .withValueSerializer(StringSerializer.class) >> // .values()); >> >> pipeline.run();//.waitUntilFinish(); >> >> How to fix it ? >> >> Thank you! >
