This is becoming very tedious.

As suggested I changed the kafka dependency from

ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
to

libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

and compiled and ran the job again anf failed. This is the log file

2018-07-02 21:38:38,656 INFO
org.apache.kafka.common.utils.AppInfoParser                   - Kafka
version : 1.1.0
2018-07-02 21:38:38,656 INFO
org.apache.kafka.common.utils.AppInfoParser                   - Kafka
commitId : fdcf75ea326b8e07
2018-07-02 21:38:38,696 INFO
org.apache.kafka.clients.Metadata                             - Cluster ID:
3SqEt4DcTruOr_SlQ6fqTQ
2018-07-02 21:38:38,698 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading the following 1 partitions from the
committed group offsets in Kafka: [KafkaTopicPartition{topic='md',
partition=0}]
2018-07-02 21:38:38,702 INFO
org.apache.kafka.clients.consumer.ConsumerConfig              -
ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [rhes75:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = md_streaming
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-07-02 21:38:38,705 INFO
org.apache.kafka.common.utils.AppInfoParser                   - Kafka
version : 1.1.0
2018-07-02 21:38:38,705 INFO
org.apache.kafka.common.utils.AppInfoParser                   - Kafka
commitId : fdcf75ea326b8e07
2018-07-02 21:38:38,705 WARN
org.apache.kafka.common.utils.AppInfoParser                   - Error
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-2
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
        at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:785)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482)
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171)
2018-07-02 21:38:38,709 WARN
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  -
Error while closing Kafka consumer
java.lang.NullPointerException
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286)
2018-07-02 21:38:38,710 INFO
org.apache.flink.runtime.taskmanager.Task                     - Source:
Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b)
switched from RUNNING to FAILED.
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
2018-07-02 21:38:38,713 INFO
org.apache.flink.runtime.taskmanager.Task                     - Freeing
task resources for Source: Custom Source -> Sink: Unnamed (1/1)
(bcb46879e709768c9160dd11e09ba05b).
2018-07-02 21:38:38,713 INFO
org.apache.flink.runtime.taskmanager.Task                     - Ensuring
all FileSystem streams are closed for task Source: Custom Source -> Sink:
Unnamed (1/1) (bcb46879e709768c9160d


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 2 Jul 2018 at 20:59, Ted Yu <yuzhih...@gmail.com> wrote:

> From flink-connector-kafka-0.11 dependency, we know the version of Kafka
> used is (flink-connectors/flink-connector-kafka-0.11/pom.xml):
>
>     <kafka.version>0.11.0.2</kafka.version>
> From Kafka side, you specified 1.1.0
>
> I think these versions produce what you experienced.
> If you use Kafka 0.11, this problem should go away.
>
> On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> This is the code
>>
>> import java.util.Properties
>> import java.util.Arrays
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.streaming.util.serialization.DeserializationSchema
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.consumer.ConsumerRecords;
>> import org.apache.kafka.clients.consumer.KafkaConsumer;
>>
>> object md_streaming
>> {
>>   def main(args: Array[String])
>>   {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val properties = new Properties()
>>     properties.setProperty("bootstrap.servers", "rhes75:9092")
>>     properties.setProperty("zookeeper.connect", "rhes75:2181")
>>     properties.setProperty("group.id", "md_streaming")
>>     val stream = env
>>                  .addSource(new FlinkKafkaConsumer09[String]("md", new
>> SimpleStringSchema(), properties))
>>                  .writeAsText("/tmp/md_streaming.txt")
>>     env.execute("Flink Kafka Example")
>>   }
>>
>> and this is the sbt dependencies
>>
>> libraryDependencies += "org.apache.flink" %%
>> "flink-connector-kafka-0.11" % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
>> "1.5.0"
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
>>
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 2 Jul 2018 at 17:45, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Here is the signature of assign :
>>>
>>>     public void assign(Collection<TopicPartition> partitions) {
>>>
>>> Looks like RestClusterClient was built against one version of Kafka but
>>> runs against a different version.
>>>
>>> Please check the sbt dependency and the version of Kafka jar on the
>>> classpath.
>>>
>>> Thanks
>>>
>>> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Have you seen this error by any chance in flink streaming with Kafka
>>>> please?
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>> java.lang.NoSuchMethodError:
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
>>>>         at
>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>>>>         at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>>>>         at
>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>>>         at md_streaming$.main(md_streaming.scala:30)
>>>>         at md_streaming.main(md_streaming.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>>>>         at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>>>>         at
>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
>>>>         at
>>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
>>>>         at
>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>>>         at
>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>>>>         at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>         at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>>>         at
>>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>>         at
>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>>>> Caused by: java.lang.NoSuchMethodError:
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
>>>>
>>>>
>>>> thanks
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
>>>
>

Reply via email to