Re: Timestamp from Kafka record and watermark generation

2018-02-23 Thread Federico D'Ambrosio
Thank you very much Aljoscha!

2018-02-23 14:45 GMT+01:00 Aljoscha Krettek :

> Hi,
>
> This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500.
> And yes, the workaround is to write an assigner from scratch but you can
> start by copying the code of AscendingTimestampExtractor.
>
> Sorry for the inconvenience.
>
> --
> Aljoscha
>
> On 22. Feb 2018, at 12:05, Federico D'Ambrosio  wrote:
>
> Hello everyone,
>
> I'm consuming from a Kafka topic, on which I'm writing with a
> FlinkKafkaProducer, with the timestamp relative flag set to true.
>
> From what I gather from the documentation [1], Flink is aware of Kafka
> Record's timestamp and only the watermark should be set with an appropriate
> TimestampExtractor, still I'm failing to understand how to implement it in
> the right way.
>
> I thought that it would be possible to use the already existent
> AscendingTimestampExtractor, overriding the extractTimestamp method, but
> it's marked final.
>
> new FlinkKafkaConsumer010[Event](ingestion_topic, new 
> JSONDeserializationSchema(), consumerConfig)
>   .setStartFromLatest()
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
>   def extractAscendingTimestamp(element: Event): Long = ???
> })
>
> Should I need to implement my own TimestampExtractor (with the appropriate
> getCurrentWatermark and extractTimestamp methods) ?
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#using-kafka-
> timestamps-and-flink-event-time-in-kafka-010
>
> Thank you,
> Federico
>
>
>


-- 
Federico D'Ambrosio


Re: Timestamp from Kafka record and watermark generation

2018-02-23 Thread Aljoscha Krettek
Hi,

This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500 
. And yes, the workaround is 
to write an assigner from scratch but you can start by copying the code of 
AscendingTimestampExtractor.

Sorry for the inconvenience.

--
Aljoscha

> On 22. Feb 2018, at 12:05, Federico D'Ambrosio  wrote:
> 
> Hello everyone,
> 
> I'm consuming from a Kafka topic, on which I'm writing with a 
> FlinkKafkaProducer, with the timestamp relative flag set to true.
> 
> From what I gather from the documentation [1], Flink is aware of Kafka 
> Record's timestamp and only the watermark should be set with an appropriate 
> TimestampExtractor, still I'm failing to understand how to implement it in 
> the right way.
> 
> I thought that it would be possible to use the already existent 
> AscendingTimestampExtractor, overriding the extractTimestamp method, but it's 
> marked final. 
> new FlinkKafkaConsumer010[Event](ingestion_topic, new 
> JSONDeserializationSchema(), consumerConfig)
>   .setStartFromLatest()
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
>   def extractAscendingTimestamp(element: Event): Long = ???
> })
> Should I need to implement my own TimestampExtractor (with the appropriate 
> getCurrentWatermark and extractTimestamp methods) ? 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>   
> 
> 
> Thank you,
> Federico
> 



Timestamp from Kafka record and watermark generation

2018-02-22 Thread Federico D'Ambrosio
Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a
FlinkKafkaProducer, with the timestamp relative flag set to true.

>From what I gather from the documentation [1], Flink is aware of Kafka
Record's timestamp and only the watermark should be set with an appropriate
TimestampExtractor, still I'm failing to understand how to implement it in
the right way.

I thought that it would be possible to use the already existent
AscendingTimestampExtractor, overriding the extractTimestamp method, but
it's marked final.

new FlinkKafkaConsumer010[Event](ingestion_topic, new
JSONDeserializationSchema(), consumerConfig)
  .setStartFromLatest()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
  def extractAscendingTimestamp(element: Event): Long = ???
})

Should I need to implement my own TimestampExtractor (with the appropriate
getCurrentWatermark and extractTimestamp methods) ?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010


Thank you,
Federico