This is becoming very tedious. As suggested I changed the kafka dependency from
ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" to libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" and compiled and ran the job again anf failed. This is the log file 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 2018-07-02 21:38:38,696 INFO org.apache.kafka.clients.Metadata - Cluster ID: 3SqEt4DcTruOr_SlQ6fqTQ 2018-07-02 21:38:38,698 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='md', partition=0}] 2018-07-02 21:38:38,702 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [rhes75:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = md_streaming heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 2018-07-02 21:38:38,705 WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-2 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:785) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171) 2018-07-02 21:38:38,709 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Error while closing Kafka consumer java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286) 2018-07-02 21:38:38,710 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b). 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160d Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 20:59, Ted Yu <yuzhih...@gmail.com> wrote: > From flink-connector-kafka-0.11 dependency, we know the version of Kafka > used is (flink-connectors/flink-connector-kafka-0.11/pom.xml): > > <kafka.version>0.11.0.2</kafka.version> > From Kafka side, you specified 1.1.0 > > I think these versions produce what you experienced. > If you use Kafka 0.11, this problem should go away. > > On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Hi, >> >> This is the code >> >> import java.util.Properties >> import java.util.Arrays >> import org.apache.flink.api.common.functions.MapFunction >> import org.apache.flink.api.java.utils.ParameterTool >> import org.apache.flink.streaming.api.datastream.DataStream >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.flink.streaming.util.serialization.DeserializationSchema >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.kafka.clients.consumer.ConsumerConfig; >> import org.apache.kafka.clients.consumer.ConsumerRecord; >> import org.apache.kafka.clients.consumer.ConsumerRecords; >> import org.apache.kafka.clients.consumer.KafkaConsumer; >> >> object md_streaming >> { >> def main(args: Array[String]) >> { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", "rhes75:9092") >> properties.setProperty("zookeeper.connect", "rhes75:2181") >> properties.setProperty("group.id", "md_streaming") >> val stream = env >> .addSource(new FlinkKafkaConsumer09[String]("md", new >> SimpleStringSchema(), properties)) >> .writeAsText("/tmp/md_streaming.txt") >> env.execute("Flink Kafka Example") >> } >> >> and this is the sbt dependencies >> >> libraryDependencies += "org.apache.flink" %% >> "flink-connector-kafka-0.11" % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" >> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % >> "1.5.0" >> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" >> >> >> Thanks >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 2 Jul 2018 at 17:45, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Here is the signature of assign : >>> >>> public void assign(Collection<TopicPartition> partitions) { >>> >>> Looks like RestClusterClient was built against one version of Kafka but >>> runs against a different version. >>> >>> Please check the sbt dependency and the version of Kafka jar on the >>> classpath. >>> >>> Thanks >>> >>> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Have you seen this error by any chance in flink streaming with Kafka >>>> please? >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: >>>> java.lang.NoSuchMethodError: >>>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >>>> at >>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>> at md_streaming$.main(md_streaming.scala:30) >>>> at md_streaming.main(md_streaming.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) >>>> at >>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) >>>> Caused by: java.lang.NoSuchMethodError: >>>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) >>>> >>>> >>>> thanks >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>> >>> >