Hi Mich,

FlinkKafkaConsumer09 is the connector for Kafka 0.9.x.
Have you tried to use FlinkKafkaConsumer011 instead of FlinkKafkaConsumer09?

Best, Fabian



2018-07-02 22:57 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:

> This is becoming very tedious.
>
> As suggested I changed the kafka dependency from
>
> ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
> to
>
> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"
>
> and compiled and ran the job again anf failed. This is the log file
>
> 2018-07-02 21:38:38,656 INFO  org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka version : 1.1.0
> 2018-07-02 21:38:38,656 INFO  org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka commitId : fdcf75ea326b8e07
> 2018-07-02 21:38:38,696 INFO  org.apache.kafka.clients.
> Metadata                             - Cluster ID: 3SqEt4DcTruOr_SlQ6fqTQ
> 2018-07-02 21:38:38,698 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> - Consumer subtask 0 will start reading the following 1 partitions from the
> committed group offsets in Kafka: [KafkaTopicPartition{topic='md',
> partition=0}]
> 2018-07-02 21:38:38,702 INFO  org.apache.kafka.clients.
> consumer.ConsumerConfig              - ConsumerConfig values:
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = latest
>         bootstrap.servers = [rhes75:9092]
>         check.crcs = true
>         client.id =
>         connections.max.idle.ms = 540000
>         enable.auto.commit = true
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = md_streaming
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         isolation.level = read_uncommitted
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 500
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [class org.apache.kafka.clients.
> consumer.RangeAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 305000
>         retry.backoff.ms = 100
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
>         session.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
> 2018-07-02 21:38:38,705 INFO  org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka version : 1.1.0
> 2018-07-02 21:38:38,705 INFO  org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka commitId : fdcf75ea326b8e07
> 2018-07-02 21:38:38,705 WARN  org.apache.kafka.common.utils.
> AppInfoParser                   - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-2
>         at com.sun.jmx.mbeanserver.Repository.addMBean(
> Repository.java:437)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> registerObject(DefaultMBeanServerInterceptor.java:900)
>         at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
> registerMBean(DefaultMBeanServerInterceptor.java:324)
>         at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
> JmxMBeanServer.java:522)
>         at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(
> AppInfoParser.java:62)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> KafkaConsumer.java:785)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> KafkaConsumer.java:644)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> KafkaConsumer.java:624)
>         at org.apache.flink.streaming.connectors.kafka.internal.
> KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482)
>         at org.apache.flink.streaming.connectors.kafka.internal.
> KafkaConsumerThread.run(KafkaConsumerThread.java:171)
> 2018-07-02 21:38:38,709 WARN  org.apache.flink.streaming.
> connectors.kafka.internal.Kafka09Fetcher  - Error while closing Kafka
> consumer
> java.lang.NullPointerException
>         at org.apache.flink.streaming.connectors.kafka.internal.
> KafkaConsumerThread.run(KafkaConsumerThread.java:286)
> 2018-07-02 21:38:38,710 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - Source: Custom Source -> Sink:
> Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) switched from RUNNING to
> FAILED.
> java.lang.NoSuchMethodError: org.apache.kafka.clients.
> consumer.KafkaConsumer.assign(Ljava/util/List;)V
>         at org.apache.flink.streaming.connectors.kafka.internal.
> KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
>         at org.apache.flink.streaming.connectors.kafka.internal.
> KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
>         at org.apache.flink.streaming.connectors.kafka.internal.
> KafkaConsumerThread.run(KafkaConsumerThread.java:243)
> 2018-07-02 21:38:38,713 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - Freeing task resources for Source:
> Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b).
> 2018-07-02 21:38:38,713 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - Ensuring all FileSystem streams
> are closed for task Source: Custom Source -> Sink: Unnamed (1/1)
> (bcb46879e709768c9160d
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 2 Jul 2018 at 20:59, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> From flink-connector-kafka-0.11 dependency, we know the version of Kafka
>> used is (flink-connectors/flink-connector-kafka-0.11/pom.xml):
>>
>>     <kafka.version>0.11.0.2</kafka.version>
>> From Kafka side, you specified 1.1.0
>>
>> I think these versions produce what you experienced.
>> If you use Kafka 0.11, this problem should go away.
>>
>> On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This is the code
>>>
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.streaming.api.datastream.DataStream
>>> import org.apache.flink.streaming.api.environment.
>>> StreamExecutionEnvironment
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.streaming.util.serialization.
>>> DeserializationSchema
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>>> import org.apache.kafka.clients.consumer.ConsumerRecords;
>>> import org.apache.kafka.clients.consumer.KafkaConsumer;
>>>
>>> object md_streaming
>>> {
>>>   def main(args: Array[String])
>>>   {
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     val properties = new Properties()
>>>     properties.setProperty("bootstrap.servers", "rhes75:9092")
>>>     properties.setProperty("zookeeper.connect", "rhes75:2181")
>>>     properties.setProperty("group.id", "md_streaming")
>>>     val stream = env
>>>                  .addSource(new FlinkKafkaConsumer09[String]("md", new
>>> SimpleStringSchema(), properties))
>>>                  .writeAsText("/tmp/md_streaming.txt")
>>>     env.execute("Flink Kafka Example")
>>>   }
>>>
>>> and this is the sbt dependencies
>>>
>>> libraryDependencies += "org.apache.flink" %%
>>> "flink-connector-kafka-0.11" % "1.5.0"
>>> libraryDependencies += "org.apache.flink" %%
>>> "flink-connector-kafka-base" % "1.5.0"
>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
>>> "1.5.0"
>>> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
>>>
>>>
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 2 Jul 2018 at 17:45, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Here is the signature of assign :
>>>>
>>>>     public void assign(Collection<TopicPartition> partitions) {
>>>>
>>>> Looks like RestClusterClient was built against one version of Kafka
>>>> but runs against a different version.
>>>>
>>>> Please check the sbt dependency and the version of Kafka jar on the
>>>> classpath.
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Have you seen this error by any chance in flink streaming with Kafka
>>>>> please?
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>> java.lang.NoSuchMethodError: org.apache.kafka.clients.
>>>>> consumer.KafkaConsumer.assign(Ljava/util/List;)V
>>>>>         at org.apache.flink.client.program.rest.
>>>>> RestClusterClient.submitJob(RestClusterClient.java:265)
>>>>>         at org.apache.flink.client.program.ClusterClient.run(
>>>>> ClusterClient.java:464)
>>>>>         at org.apache.flink.streaming.api.environment.
>>>>> StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>>>>         at md_streaming$.main(md_streaming.scala:30)
>>>>>         at md_streaming.main(md_streaming.scala)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>> NativeMethodAccessorImpl.java:62)
>>>>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>         at org.apache.flink.client.program.PackagedProgram.
>>>>> callMainMethod(PackagedProgram.java:528)
>>>>>         at org.apache.flink.client.program.PackagedProgram.
>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:420)
>>>>>         at org.apache.flink.client.program.ClusterClient.run(
>>>>> ClusterClient.java:404)
>>>>>         at org.apache.flink.client.cli.CliFrontend.executeProgram(
>>>>> CliFrontend.java:781)
>>>>>         at org.apache.flink.client.cli.CliFrontend.runProgram(
>>>>> CliFrontend.java:275)
>>>>>         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.
>>>>> java:210)
>>>>>         at org.apache.flink.client.cli.CliFrontend.parseParameters(
>>>>> CliFrontend.java:1020)
>>>>>         at org.apache.flink.client.cli.CliFrontend.lambda$main$9(
>>>>> CliFrontend.java:1096)
>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>         at org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1836)
>>>>>         at org.apache.flink.runtime.security.HadoopSecurityContext.
>>>>> runSecured(HadoopSecurityContext.java:41)
>>>>>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.
>>>>> java:1096)
>>>>> Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
>>>>> consumer.KafkaConsumer.assign(Ljava/util/List;)V
>>>>>         at org.apache.flink.streaming.connectors.kafka.internal.
>>>>> KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:
>>>>> 42)
>>>>>         at org.apache.flink.streaming.connectors.kafka.internal.
>>>>> KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
>>>>>         at org.apache.flink.streaming.connectors.kafka.internal.
>>>>> KafkaConsumerThread.run(KafkaConsumerThread.java:243)
>>>>>
>>>>>
>>>>> thanks
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>

Reply via email to