Re: Getting java.lang.Exception when try to fetch data from Kafka
gt;>> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: >>>>> Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING >>>>> to >>>>> FAILED >>>>> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: >>>>> Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING >>>>> to >>>>> CANCELING >>>>> >>>>> java.lang.Exception >>>>> at >>>>> >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) >>>>> at >>>>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) >>>>> at >>>>> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>>>> at >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>>>> at >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.io.EOFException >>>>> at >>>>> >>>>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298) >>>>> at >>>>> org.apache.flink.types.StringValue.readString(StringValue.java:771) >>>>> at >>>>> >>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>>> at >>>>> >>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>>> at >>>>> >>>>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105) >>>>> at >>>>> >>>>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39) >>>>> at >>>>> >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) >>>>> >>>>> >>>>> Regards >>>>> Prateek >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive at Nabble.com. >>>>> >>>> >>>> >>> >> >> >> -- >> If you reply to this email, your message will be added to the discussion >> below: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6464.html >> To unsubscribe from Getting java.lang.Exception when try to fetch data >> from Kafka, click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > -- > View this message in context: Re: Getting java.lang.Exception when try to > fetch data from Kafka > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6516.html> > > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >
Re: Getting java.lang.Exception when try to fetch data from Kafka
e.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) >>>> at >>>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) >>>> at >>>> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>>> at >>>> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>>> at >>>> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: java.io.EOFException >>>> at >>>> >>>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298) >>>> at >>>> org.apache.flink.types.StringValue.readString(StringValue.java:771) >>>> at >>>> >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>> at >>>> >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>> at >>>> >>>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105) >>>> at >>>> >>>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39) >>>> at >>>> >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) >>>> >>>> >>>> Regards >>>> Prateek >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive at Nabble.com. >>>> >>> >>> >> > > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6464.html > To unsubscribe from Getting java.lang.Exception when try to fetch data > from Kafka, click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=6365&code=cHJhdGVlay5hcm9yYTJrNkBnbWFpbC5jb218NjM2NXwtOTMxMzI0ODYx> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6516.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Getting java.lang.Exception when try to fetch data from Kafka
Hi Prateek, sorry for the late response. Can you try implementing your own DeserializationSchema, where you deserialize the String key manually (just call the "new String(byte[]) constructor). The TypeInformationKeyValueSerializationSchema[String, byte] is generating deserializers with Flink's internal serializer stack (these assume that the data has been serialized by Flink as well). I think Flink's StringSerializer does some fancy optimizations and is not compatible with the standard String() format. On Tue, Apr 26, 2016 at 6:34 PM, prateek arora wrote: > Hi Robert , > > Hi > > I have java program to send data into kafka topic. below is code for this > : > > private Producer producer = null > > Serializer keySerializer = new StringSerializer(); > Serializer valueSerializer = new ByteArraySerializer(); > producer = new KafkaProducer(props, keySerializer, > valueSerializer); > > ProducerRecord imageRecord; > imageRecord = new ProducerRecord(streamInfo.topic, > Integer.toString(messageKey), imageData); > > producer.send(imageRecord); > > > then trying to fetch data in Apache flink . > > Regards > Prateek > > On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger > wrote: > >> Hi Prateek, >> >> were the messages written to the Kafka topic by Flink, using the >> TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink >> deserializers expect a different data format of the messages in the topic. >> >> How are the messages written into the topic? >> >> >> On Fri, Apr 22, 2016 at 10:21 PM, prateekarora < >> prateek.arora...@gmail.com> wrote: >> >>> >>> Hi >>> >>> I am sending data using kafkaProducer API >>> >>>imageRecord = new ProducerRecord>> byte[]>(topic,messageKey, imageData); >>> producer.send(imageRecord); >>> >>> >>> And in flink program try to fect data using FlinkKafkaConsumer08 . below >>> are the sample code . >>> >>> def main(args: Array[String]) { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", ":9092") >>> properties.setProperty("zookeeper.connect", ":2181") >>> properties.setProperty("group.id", "test") >>> >>> val readSchema = new >>> >>> TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]], >>> >>> env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]] >>> >>> val stream : DataStream[(String,Array[Byte])] = >>> env.addSource(new >>> FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties)) >>> >>> stream.print >>> env.execute("Flink Kafka Example") >>> } >>> >>> >>> but getting below error : >>> >>> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: >>> Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to >>> FAILED >>> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: >>> Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to >>> CANCELING >>> >>> java.lang.Exception >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) >>> at >>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> at >>> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> at >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.io.EOFException >>> at >>> >>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298) >>> at >>> org.apache.flink.types.StringValue.readString(StringValue.java:771) >>> at >>> >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>> at >>> >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>> at >>> >>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105) >>> at >>> >>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39) >>> at >>> >>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) >>> >>> >>> Regards >>> Prateek >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fe
Re: Getting java.lang.Exception when try to fetch data from Kafka
Hi Robert , Hi I have java program to send data into kafka topic. below is code for this : private Producer producer = null Serializer keySerializer = new StringSerializer(); Serializer valueSerializer = new ByteArraySerializer(); producer = new KafkaProducer(props, keySerializer, valueSerializer); ProducerRecord imageRecord; imageRecord = new ProducerRecord(streamInfo.topic, Integer.toString(messageKey), imageData); producer.send(imageRecord); then trying to fetch data in Apache flink . Regards Prateek On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger wrote: > Hi Prateek, > > were the messages written to the Kafka topic by Flink, using the > TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink > deserializers expect a different data format of the messages in the topic. > > How are the messages written into the topic? > > > On Fri, Apr 22, 2016 at 10:21 PM, prateekarora > wrote: > >> >> Hi >> >> I am sending data using kafkaProducer API >> >>imageRecord = new ProducerRecord> byte[]>(topic,messageKey, imageData); >> producer.send(imageRecord); >> >> >> And in flink program try to fect data using FlinkKafkaConsumer08 . below >> are the sample code . >> >> def main(args: Array[String]) { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", ":9092") >> properties.setProperty("zookeeper.connect", ":2181") >> properties.setProperty("group.id", "test") >> >> val readSchema = new >> >> TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]], >> >> env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]] >> >> val stream : DataStream[(String,Array[Byte])] = >> env.addSource(new >> FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties)) >> >> stream.print >> env.execute("Flink Kafka Example") >> } >> >> >> but getting below error : >> >> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: >> Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to >> FAILED >> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: >> Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to >> CANCELING >> >> java.lang.Exception >> at >> >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) >> at >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) >> at >> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >> at >> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >> at >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.EOFException >> at >> >> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298) >> at org.apache.flink.types.StringValue.readString(StringValue.java:771) >> at >> >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >> at >> >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >> at >> >> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105) >> at >> >> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39) >> at >> >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) >> >> >> Regards >> Prateek >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > >
Re: Getting java.lang.Exception when try to fetch data from Kafka
Hi I have java program to send data into kafka topic. below is code for this : private Producer producer = null Serializer keySerializer = new StringSerializer(); Serializer valueSerializer = new ByteArraySerializer(); producer = new KafkaProducer(props, keySerializer, valueSerializer); ProducerRecord imageRecord; imageRecord = new ProducerRecord(streamInfo.topic, Integer.toString(messageKey), imageData); producer.send(imageRecord); then trying to fetch data in Apache flink . Regards Prateek -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6418.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Getting java.lang.Exception when try to fetch data from Kafka
Hi I have java program that sending data into kafka topic using kafa client API (0.8.2) here is sample to code using to send data in kafka topic : import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; ProducerRecord imageRecord; imageRecord = new ProducerRecord(streamInfo.topic, Integer.toString(messageKey), imageData); producer.send(imageRecord); Regrads Prateek -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365p6415.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Getting java.lang.Exception when try to fetch data from Kafka
Hi Prateek, were the messages written to the Kafka topic by Flink, using the TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink deserializers expect a different data format of the messages in the topic. How are the messages written into the topic? On Fri, Apr 22, 2016 at 10:21 PM, prateekarora wrote: > > Hi > > I am sending data using kafkaProducer API > >imageRecord = new ProducerRecord byte[]>(topic,messageKey, imageData); > producer.send(imageRecord); > > > And in flink program try to fect data using FlinkKafkaConsumer08 . below > are the sample code . > > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val properties = new Properties() > properties.setProperty("bootstrap.servers", ":9092") > properties.setProperty("zookeeper.connect", ":2181") > properties.setProperty("group.id", "test") > > val readSchema = new > > TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]], > > env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]] > > val stream : DataStream[(String,Array[Byte])] = > env.addSource(new > FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties)) > > stream.print > env.execute("Flink Kafka Example") > } > > > but getting below error : > > 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: > Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to > FAILED > 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: > Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to > CANCELING > > java.lang.Exception > at > > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException > at > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298) > at org.apache.flink.types.StringValue.readString(StringValue.java:771) > at > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > at > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at > > org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105) > at > > org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39) > at > > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) > > > Regards > Prateek > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Getting java.lang.Exception when try to fetch data from Kafka
Hi I am sending data using kafkaProducer API imageRecord = new ProducerRecord(topic,messageKey, imageData); producer.send(imageRecord); And in flink program try to fect data using FlinkKafkaConsumer08 . below are the sample code . def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", ":9092") properties.setProperty("zookeeper.connect", ":2181") properties.setProperty("group.id", "test") val readSchema = new TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]], env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]] val stream : DataStream[(String,Array[Byte])] = env.addSource(new FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties)) stream.print env.execute("Flink Kafka Example") } but getting below error : 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to FAILED 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to CANCELING java.lang.Exception at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298) at org.apache.flink.types.StringValue.readString(StringValue.java:771) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105) at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) Regards Prateek -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.