I am processing data and then sending it to kafka by kafka sink .

this is method where I am producing the data

nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
        .name("nudge-details-producer")
        .uid("nudge-details-producer");




its my producer

public class NudgeCarLevelProducer {

    static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
    public static FlinkKafkaProducer010<NudgeDetails>
getProducer(PeakLocationFinderGlobalConfig config) {
        return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
                new NudgeCarLevelSchema(config),

FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
    }
}


class NudgeCarLevelSchema implements SerializationSchema<NudgeDetails>
{
    Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
    ObjectMapper mapper = new ObjectMapper();
    PeakLocationFinderGlobalConfig config;

    public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
    {
       this.config  = config;
    }

    @Override
    public byte[] serialize(NudgeDetails element) {
        byte [] bytes = null;
        Document document = new Document();
        document.setId(UUID.randomUUID().toString());
        Metadata metadata = new Metadata();
        metadata.setSchema(config.getFabricCarLevelDataStream());
        metadata.setSchemaVersion(1);
        metadata.setTenant(config.getTenantId());
        metadata.setTimestamp(System.currentTimeMillis());
        metadata.setType(Type.EVENT);
        metadata.setSender("nudge");
        metadata.setStream(config.getFabricCarLevelDataStream());
        document.setMetadata(metadata);
        document.setData(element);
        try {
            bytes =  mapper.writeValueAsString(document).getBytes();
        } catch (Exception e) {
            logger.error("error while serializing nudge car level Schema");
        }
        return bytes;
    }
}




On Mon, Sep 24, 2018 at 12:24 PM miki haiat <miko5...@gmail.com> wrote:

> What are you trying to do , can you share some code ?
> This is the reason for the exeption
> Proceeding to force close the producer since pending requests could not be
> completed within timeout 9223372036854775807 ms.
>
>
>
> On Mon, 24 Sep 2018, 9:23 yuvraj singh, <19yuvrajsing...@gmail.com> wrote:
>
>> Hi all ,
>>
>>
>> I am getting this error with flink 1.6.0 , please help me .
>>
>>
>>
>>
>>
>>
>> 2018-09-23 07:15:08,846 ERROR
>> org.apache.kafka.clients.producer.KafkaProducer               -
>> Interrupted while joining ioThread
>>
>> java.lang.InterruptedException
>>
>>         at java.lang.Object.wait(Native Method)
>>
>>         at java.lang.Thread.join(Thread.java:1257)
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>
>>         at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer
>>               - Proceeding to force close the producer since pending
>> requests could not be completed within timeout 9223372036854775807 ms.
>>
>> 2018-09-23 07:15:08,860 ERROR
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>> during disposal of stream operator.
>>
>> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>
>>         at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.InterruptedException
>>
>>         at java.lang.Object.wait(Native Method)
>>
>>         at java.lang.Thread.join(Thread.java:1257)
>>
>>         at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>
>>         ... 9 more
>>
>>
>> Thanks
>>
>> Yubraj Singh
>>
>

Reply via email to