Hi Mich, FlinkKafkaConsumer09 is the connector for Kafka 0.9.x. Have you tried to use FlinkKafkaConsumer011 instead of FlinkKafkaConsumer09?
Best, Fabian 2018-07-02 22:57 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > This is becoming very tedious. > > As suggested I changed the kafka dependency from > > ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" > to > > libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" > > and compiled and ran the job again anf failed. This is the log file > > 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka version : 1.1.0 > 2018-07-02 21:38:38,656 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka commitId : fdcf75ea326b8e07 > 2018-07-02 21:38:38,696 INFO org.apache.kafka.clients. > Metadata - Cluster ID: 3SqEt4DcTruOr_SlQ6fqTQ > 2018-07-02 21:38:38,698 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase > - Consumer subtask 0 will start reading the following 1 partitions from the > committed group offsets in Kafka: [KafkaTopicPartition{topic='md', > partition=0}] > 2018-07-02 21:38:38,702 INFO org.apache.kafka.clients. > consumer.ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [rhes75:9092] > check.crcs = true > client.id = > connections.max.idle.ms = 540000 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = md_streaming > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class org.apache.kafka.clients. > consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka version : 1.1.0 > 2018-07-02 21:38:38,705 INFO org.apache.kafka.common.utils. > AppInfoParser - Kafka commitId : fdcf75ea326b8e07 > 2018-07-02 21:38:38,705 WARN org.apache.kafka.common.utils. > AppInfoParser - Error registering AppInfo mbean > javax.management.InstanceAlreadyExistsException: > kafka.consumer:type=app-info,id=consumer-2 > at com.sun.jmx.mbeanserver.Repository.addMBean( > Repository.java:437) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerObject(DefaultMBeanServerInterceptor.java:900) > at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor. > registerMBean(DefaultMBeanServerInterceptor.java:324) > at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean( > JmxMBeanServer.java:522) > at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo( > AppInfoParser.java:62) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>( > KafkaConsumer.java:785) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>( > KafkaConsumer.java:644) > at org.apache.kafka.clients.consumer.KafkaConsumer.<init>( > KafkaConsumer.java:624) > at org.apache.flink.streaming.connectors.kafka.internal. > KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) > at org.apache.flink.streaming.connectors.kafka.internal. > KafkaConsumerThread.run(KafkaConsumerThread.java:171) > 2018-07-02 21:38:38,709 WARN org.apache.flink.streaming. > connectors.kafka.internal.Kafka09Fetcher - Error while closing Kafka > consumer > java.lang.NullPointerException > at org.apache.flink.streaming.connectors.kafka.internal. > KafkaConsumerThread.run(KafkaConsumerThread.java:286) > 2018-07-02 21:38:38,710 INFO org.apache.flink.runtime. > taskmanager.Task - Source: Custom Source -> Sink: > Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) switched from RUNNING to > FAILED. > java.lang.NoSuchMethodError: org.apache.kafka.clients. > consumer.KafkaConsumer.assign(Ljava/util/List;)V > at org.apache.flink.streaming.connectors.kafka.internal. > KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) > at org.apache.flink.streaming.connectors.kafka.internal. > KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) > at org.apache.flink.streaming.connectors.kafka.internal. > KafkaConsumerThread.run(KafkaConsumerThread.java:243) > 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime. > taskmanager.Task - Freeing task resources for Source: > Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b). > 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime. > taskmanager.Task - Ensuring all FileSystem streams > are closed for task Source: Custom Source -> Sink: Unnamed (1/1) > (bcb46879e709768c9160d > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 2 Jul 2018 at 20:59, Ted Yu <yuzhih...@gmail.com> wrote: > >> From flink-connector-kafka-0.11 dependency, we know the version of Kafka >> used is (flink-connectors/flink-connector-kafka-0.11/pom.xml): >> >> <kafka.version>0.11.0.2</kafka.version> >> From Kafka side, you specified 1.1.0 >> >> I think these versions produce what you experienced. >> If you use Kafka 0.11, this problem should go away. >> >> On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Hi, >>> >>> This is the code >>> >>> import java.util.Properties >>> import java.util.Arrays >>> import org.apache.flink.api.common.functions.MapFunction >>> import org.apache.flink.api.java.utils.ParameterTool >>> import org.apache.flink.streaming.api.datastream.DataStream >>> import org.apache.flink.streaming.api.environment. >>> StreamExecutionEnvironment >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 >>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>> import org.apache.flink.streaming.util.serialization. >>> DeserializationSchema >>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>> import org.apache.kafka.clients.consumer.ConsumerConfig; >>> import org.apache.kafka.clients.consumer.ConsumerRecord; >>> import org.apache.kafka.clients.consumer.ConsumerRecords; >>> import org.apache.kafka.clients.consumer.KafkaConsumer; >>> >>> object md_streaming >>> { >>> def main(args: Array[String]) >>> { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", "rhes75:9092") >>> properties.setProperty("zookeeper.connect", "rhes75:2181") >>> properties.setProperty("group.id", "md_streaming") >>> val stream = env >>> .addSource(new FlinkKafkaConsumer09[String]("md", new >>> SimpleStringSchema(), properties)) >>> .writeAsText("/tmp/md_streaming.txt") >>> env.execute("Flink Kafka Example") >>> } >>> >>> and this is the sbt dependencies >>> >>> libraryDependencies += "org.apache.flink" %% >>> "flink-connector-kafka-0.11" % "1.5.0" >>> libraryDependencies += "org.apache.flink" %% >>> "flink-connector-kafka-base" % "1.5.0" >>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" >>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % >>> "1.5.0" >>> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" >>> >>> >>> Thanks >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Mon, 2 Jul 2018 at 17:45, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Here is the signature of assign : >>>> >>>> public void assign(Collection<TopicPartition> partitions) { >>>> >>>> Looks like RestClusterClient was built against one version of Kafka >>>> but runs against a different version. >>>> >>>> Please check the sbt dependency and the version of Kafka jar on the >>>> classpath. >>>> >>>> Thanks >>>> >>>> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Have you seen this error by any chance in flink streaming with Kafka >>>>> please? >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>> java.lang.NoSuchMethodError: org.apache.kafka.clients. >>>>> consumer.KafkaConsumer.assign(Ljava/util/List;)V >>>>> at org.apache.flink.client.program.rest. >>>>> RestClusterClient.submitJob(RestClusterClient.java:265) >>>>> at org.apache.flink.client.program.ClusterClient.run( >>>>> ClusterClient.java:464) >>>>> at org.apache.flink.streaming.api.environment. >>>>> StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>>> at md_streaming$.main(md_streaming.scala:30) >>>>> at md_streaming.main(md_streaming.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke( >>>>> NativeMethodAccessorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>>> DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at org.apache.flink.client.program.PackagedProgram. >>>>> callMainMethod(PackagedProgram.java:528) >>>>> at org.apache.flink.client.program.PackagedProgram. >>>>> invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>>> at org.apache.flink.client.program.ClusterClient.run( >>>>> ClusterClient.java:404) >>>>> at org.apache.flink.client.cli.CliFrontend.executeProgram( >>>>> CliFrontend.java:781) >>>>> at org.apache.flink.client.cli.CliFrontend.runProgram( >>>>> CliFrontend.java:275) >>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend. >>>>> java:210) >>>>> at org.apache.flink.client.cli.CliFrontend.parseParameters( >>>>> CliFrontend.java:1020) >>>>> at org.apache.flink.client.cli.CliFrontend.lambda$main$9( >>>>> CliFrontend.java:1096) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>> at org.apache.hadoop.security.UserGroupInformation.doAs( >>>>> UserGroupInformation.java:1836) >>>>> at org.apache.flink.runtime.security.HadoopSecurityContext. >>>>> runSecured(HadoopSecurityContext.java:41) >>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend. >>>>> java:1096) >>>>> Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients. >>>>> consumer.KafkaConsumer.assign(Ljava/util/List;)V >>>>> at org.apache.flink.streaming.connectors.kafka.internal. >>>>> KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java: >>>>> 42) >>>>> at org.apache.flink.streaming.connectors.kafka.internal. >>>>> KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) >>>>> at org.apache.flink.streaming.connectors.kafka.internal. >>>>> KafkaConsumerThread.run(KafkaConsumerThread.java:243) >>>>> >>>>> >>>>> thanks >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>> >>>> >>