I am running a beam app in a Flink cluster (FlinkPipelineRunner).I think all 
the necessary artifacts are there, but its still complaing about:
 Class org.apache.kafka.common.serialization.ByteArrayDeserializer could not be 
found.
Here is the full stack trace. It throws at p.run().I appreciate your help.have 
a great weekend.
...about to run pipeline...Running thread  threw:  java.lang.RuntimeException: 
Pipeline execution failed        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:119)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:51)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at 
com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:320)        at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)  
      at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)        
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184)   
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234)Caused 
by: org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Failed to submit job fcb7f7aa808b49cbb47ff8921ce228cc 
(readfromkafka2-abahman-0506073521)        at 
org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at 
org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:146)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:116)
        ... 14 moreCaused by: 
org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 
fcb7f7aa808b49cbb47ff8921ce228cc (readfromkafka2-abahman-0506073521)        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1223)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:469)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)        
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)     
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)        
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)  
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:113)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at 
akka.actor.ActorCell.invoke(ActorCell.scala:487)        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at 
akka.dispatch.Mailbox.run(Mailbox.scala:221)        at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
 by: org.apache.flink.runtime.JobException: Creating the input splits caused an 
error: Could not create input splits from Source.        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
        at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:701)
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1147)
        ... 23 moreCaused by: java.io.IOException: Could not create input 
splits from Source.        at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:109)
        at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:41)
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: 
Invalid value org.apache.kafka.common.serialization.ByteArrayDeserializer for 
configuration value.deserializer: Class 
org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. 
       at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)        
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)        at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)    
    at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)    
    at 
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)  
      at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)  
      at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)    
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)      
  at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
        at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
        at 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
        ... 27 more

Reply via email to