Hi,

I'm creating kafka producer with timestamps enabled following
instructions at
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer


        Optional<FlinkKafkaPartitioner<T>> customPartitioner = Optional.empty();

        FlinkKafkaProducer011<T> result = new
FlinkKafkaProducer011<>(defaultTopic, serializationSchema, properties,
customPartitioner);

        result.setWriteTimestampToKafka(true);



but getting an exception:


java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp
should always be non-negative or null.
        at 
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70)
        at 
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:642)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)




Is there anything else I need to configure to embed timestamp
information into resulting kafka message?


Thank you,

Alex

Reply via email to