I am processing data and then sending it to kafka by kafka sink . this is method where I am producing the data
nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config)) .name("nudge-details-producer") .uid("nudge-details-producer"); its my producer public class NudgeCarLevelProducer { static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class); public static FlinkKafkaProducer010<NudgeDetails> getProducer(PeakLocationFinderGlobalConfig config) { return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(), new NudgeCarLevelSchema(config), FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers())); } } class NudgeCarLevelSchema implements SerializationSchema<NudgeDetails> { Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class); ObjectMapper mapper = new ObjectMapper(); PeakLocationFinderGlobalConfig config; public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config) { this.config = config; } @Override public byte[] serialize(NudgeDetails element) { byte [] bytes = null; Document document = new Document(); document.setId(UUID.randomUUID().toString()); Metadata metadata = new Metadata(); metadata.setSchema(config.getFabricCarLevelDataStream()); metadata.setSchemaVersion(1); metadata.setTenant(config.getTenantId()); metadata.setTimestamp(System.currentTimeMillis()); metadata.setType(Type.EVENT); metadata.setSender("nudge"); metadata.setStream(config.getFabricCarLevelDataStream()); document.setMetadata(metadata); document.setData(element); try { bytes = mapper.writeValueAsString(document).getBytes(); } catch (Exception e) { logger.error("error while serializing nudge car level Schema"); } return bytes; } } On Mon, Sep 24, 2018 at 12:24 PM miki haiat <miko5...@gmail.com> wrote: > What are you trying to do , can you share some code ? > This is the reason for the exeption > Proceeding to force close the producer since pending requests could not be > completed within timeout 9223372036854775807 ms. > > > > On Mon, 24 Sep 2018, 9:23 yuvraj singh, <19yuvrajsing...@gmail.com> wrote: > >> Hi all , >> >> >> I am getting this error with flink 1.6.0 , please help me . >> >> >> >> >> >> >> 2018-09-23 07:15:08,846 ERROR >> org.apache.kafka.clients.producer.KafkaProducer - >> Interrupted while joining ioThread >> >> java.lang.InterruptedException >> >> at java.lang.Object.wait(Native Method) >> >> at java.lang.Thread.join(Thread.java:1257) >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703) >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682) >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661) >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) >> >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) >> >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> >> at java.lang.Thread.run(Thread.java:745) >> >> 2018-09-23 07:15:08,847 INFO org.apache.kafka.clients.producer.KafkaProducer >> - Proceeding to force close the producer since pending >> requests could not be completed within timeout 9223372036854775807 ms. >> >> 2018-09-23 07:15:08,860 ERROR >> org.apache.flink.streaming.runtime.tasks.StreamTask - Error >> during disposal of stream operator. >> >> org.apache.kafka.common.KafkaException: Failed to close kafka producer >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734) >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682) >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661) >> >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) >> >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) >> >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.InterruptedException >> >> at java.lang.Object.wait(Native Method) >> >> at java.lang.Thread.join(Thread.java:1257) >> >> at >> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703) >> >> ... 9 more >> >> >> Thanks >> >> Yubraj Singh >> >