Ah, maybe my previous mail is wrong. Did you make sure that the Kafka dependencies are packed into the job jar by building a fat jar? You could also try putting the Kafka dependencies into the lib folder of the Flink directory.
On Fri, 6 May 2016 at 11:17 Aljoscha Krettek <[email protected]> wrote: > Could you try running it with: > options.setStreaming(true) > > I think the Flink Batch runner has problems with dealing with the Kafka > source due to some bug. I'm investigating. > > On Fri, 6 May 2016 at 10:20 amir bahmanyari <[email protected]> wrote: > >> I am running a beam app in a Flink cluster (FlinkPipelineRunner). >> I think all the necessary artifacts are there, but its still complaing >> about: >> >> Class org.apache.kafka.common.serialization.ByteArrayDeserializer could >> not be found. >> >> Here is the full stack trace. It throws at p.run(). >> I appreciate your help. >> have a great weekend. >> >> ...about to run pipeline >> ...Running thread threw: java.lang.RuntimeException: P*ipeline >> execution failed* >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:119) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:51) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) >> at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:320) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234) >> Caused by: org.apache.flink.client.program.ProgramInvocationException: >> The program execution failed: Failed to submit job >> fcb7f7aa808b49cbb47ff8921ce228cc (readfromkafka2-abahman-0506073521) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >> at >> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) >> at >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:146) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:116) >> ... 14 more >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed >> to submit job fcb7f7aa808b49cbb47ff8921ce228cc >> (readfromkafka2-abahman-0506073521) >> at org.apache.flink.runtime.jobmanager.JobManager.org >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1223) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:469) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:113) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: org.apache.flink.runtime.JobException: Creating the input >> splits caused an error: Could not create input splits from Source. >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:701) >> at org.apache.flink.runtime.jobmanager.JobManager.org >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1147) >> ... 23 more >> *Caused by: java.io.IOException: Could not create input splits from >> Source.* >> at >> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:109) >> at >> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:41) >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156) >> ... 25 more >> *Caused by: org.apache.kafka.common.config.ConfigException: Invalid value >> org.apache.kafka.common.serialization.ByteArrayDeserializer for >> configuration value.deserializer: Class >> org.apache.kafka.common.serialization.ByteArrayDeserializer could not be >> found.* >> at >> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255) >> at >> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145) >> at >> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49) >> at >> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56) >> at >> org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337) >> at >> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572) >> at >> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165) >> at >> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101) >> ... 27 more >> >>
