Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Deserializer >> max.partition.fetch.bytes = 1048576 >> max.poll.interval.ms = 30 >> max.poll.records = 500 >> metadata.max.age.ms = 30 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 3 >> partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> receive.buffer.bytes = 65536 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 305000 >> retry.backoff.ms = 100 >> sasl.jaas.config = null >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 6 >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.mechanism = GSSAPI >> security.protocol = PLAINTEXT >> send.buffer.bytes = 131072 >> session.timeout.ms = 1 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = null >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> 2018-07-02 21:38:38,705 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> version : 1.1.0 >> 2018-07-02 21:38:38,705 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> commitId : fdcf75ea326b8e07 >> 2018-07-02 21:38:38,705 WARN >> org.apache.kafka.common.utils.AppInfoParser - Error >> registering AppInfo mbean >> javax.management.InstanceAlreadyExistsException: >> kafka.consumer:type=app-info,id=consumer-2 >> at >> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) >> at >> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >> at >> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:785) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171) >> 2018-07-02 21:38:38,709 WARN >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - >> Error while closing Kafka consumer >> java.lang.NullPointerException >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286) >> 2018-07-02 21:38:38,710 INFO >> org.apache.flink.runtime.taskmanager.Task - Source: >> Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) >> switched from RUNNING to FAILED. >> java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaCons
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Hi Mich, FlinkKafkaConsumer09 is the connector for Kafka 0.9.x. Have you tried to use FlinkKafkaConsumer011 instead of FlinkKafkaConsumer09? Best, Fabian 2018-07-02 22:57 GMT+02:00 Mich Talebzadeh : > This is becoming very tedious. > > As suggested I changed the kafka dependency from > > ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" > to > > libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" > > and compiled and ran the job again anf failed. This is the log file > > 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka version : 1.1.0 > 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka commitId : fdcf75ea326b8e07 > 2018-07-02 21:38:38,696 INFO org.apache.kafka.clients. > Metadata - Cluster ID: 3SqEt4DcTruOr_SlQ6fqTQ > 2018-07-02 21:38:38,698 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase > - Consumer subtask 0 will start reading the following 1 partitions from the > committed group offsets in Kafka: [KafkaTopicPartition{topic='md', > partition=0}] > 2018-07-02 21:38:38,702 INFO org.apache.kafka.clients. > consumer.ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [rhes75:9092] > check.crcs = true > client.id = > connections.max.idle.ms = 54 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = md_streaming > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class org.apache.kafka.clients. > consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka version : 1.1.0 > 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka commitId : fdcf75ea326b8e07 > 2018-07-02 21:38:38,705 WARN org.apache.kafka.common.utils. > AppInfoParser - Error registering AppInfo mbean > javax.management.InstanceAlreadyExistsException: > kafka.consumer:type=app-info,id=consumer-2 > at com.sun.jmx.mbeanserver.Repository.addMBean( > Repository.java:437) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerObject(DefaultMBeanServerInterceptor.java:900) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerMBean(DefaultMBeanServerInterceptor.java:324) > at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean( > JmxMBeanServer.java:522) > at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo( > AppInfoParser.java:62) >
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171) 2018-07-02 21:38:38,709 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Error while closing Kafka consumer java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286) 2018-07-02 21:38:38,710 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b). 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160d Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 20:59, Ted Yu wrote: > From flink-connector-kafka-0.11 dependency, we know the version of Kafka > used is (flink-connectors/flink-connector-kafka-0.11/pom.xml): > > 0.11.0.2 > From Kafka side, you specified 1.1.0 > > I think these versions produce what you experienced. > If you use Kafka 0.11, this problem should go away. > > On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Hi, >> >> This is the code >> >> import java.util.Properties >> import java.util.Arrays >> import org.apache.flink.api.common.functions.MapFunction >> import org.apache.flink.api.java.utils.ParameterTool >> import org.apache.flink.streaming.api.datastream.DataStream >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.flink.streaming.util.serialization.DeserializationSchema >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.kafka.clients.consumer.ConsumerConfig; >> import org.apache.kafka.clients.consumer.ConsumerRecord; >> import org.apache.kafka.clients.consumer.ConsumerRecords; >> import org.apache.kafka.clients.consumer.KafkaConsumer; >> >> object md_streaming >> { >> def main(args: Array[String]) >> { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", "rhes75:9092") >> properties.setProperty("zookeeper.connect", "rhes75:2181") >> properties.setProperty("group.id", "md_streaming") >> val stream = env >> .addSource(new FlinkKafkaConsumer09[String]("md", new >> SimpleStringSchema(), properties)) >> .writeAsText("/tmp/md_streaming.txt") >> env.execute("Flink Kafka Example") >> } >> >> and this is the sbt dependencies >> >> libraryDependencies += "org.apache.flink" %% >> "flink-connector-kafka-0.11" % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" >> libraryDependencies += "org.apache.flink&q
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Hi, This is the code import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; object md_streaming { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "rhes75:9092") properties.setProperty("zookeeper.connect", "rhes75:2181") properties.setProperty("group.id", "md_streaming") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt") env.execute("Flink Kafka Example") } and this is the sbt dependencies libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 17:45, Ted Yu wrote: > Here is the signature of assign : > > public void assign(Collection partitions) { > > Looks like RestClusterClient was built against one version of Kafka but > runs against a different version. > > Please check the sbt dependency and the version of Kafka jar on the > classpath. > > Thanks > > On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh > wrote: > >> Have you seen this error by any chance in flink streaming with Kafka >> please? >> >> org.apache.flink.client.program.ProgramInvocationException: >> java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >> at md_streaming$.main(md_streaming.scala:30) >> at md_streaming.main(md_streaming.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >>
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Looks like a version issue , have you made sure that the Kafka version is compatible? > On 2. Jul 2018, at 18:35, Mich Talebzadeh wrote: > > Have you seen this error by any chance in flink streaming with Kafka please? > > org.apache.flink.client.program.ProgramInvocationException: > java.lang.NoSuchMethodError: > org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at md_streaming$.main(md_streaming.scala:30) > at md_streaming.main(md_streaming.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) > Caused by: java.lang.NoSuchMethodError: > org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) > > thanks > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > http://talebzadehmich.wordpress.com > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. >
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Have you seen this error by any chance in flink streaming with Kafka please? org.apache.flink.client.program.ProgramInvocationException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at md_streaming$.main(md_streaming.scala:30) at md_streaming.main(md_streaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.