Hi Kevin,
thanks a lot for posting this problem.
I'm adding Jark to the thread, he or another committer working on Flink SQL
can maybe provide some insights.

On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fsw0...@gmail.com> wrote:

> Looks like the event time that I've specified in the consumer is not being
> respected. Does the timestamp assigner actually work in Kafka consumers?
>
>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>         override def extractTimestamp(order: Order, recordTimestamp: Long): 
> Long = {
>           order.getTimestamp
>         }
>       })
>
>
> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote:
>
>> Hi guys, I've been recently experimenting with end-to-end testing
>> environment with Kafka and Flink (1.11)
>>
>> I've setup an infrastructure with Docker Compose composed of single Kafka
>> broker / Flink (1.11) / MinIO for checkpoint saves
>>
>> Here's the test scenario
>>
>> 1. Send 1000 messages with manual timestamp assigned to each event
>> increased by 100 milliseconds per loop (first message and last message has
>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>> writing to. Below code is the test message producer using Confluent's
>> Python SDK
>>
>> order_producer = get_order_producer()
>> current_timestamp = int(round(time() * 1000))
>> for i in range(0, 1000):
>>     order_producer.produce(
>>         topic="order",
>>         key={"key": i % 100},
>>         value={
>>             "id": 1000,
>>             "customerId": i % 10,
>>             "timestamp": current_timestamp + i * 100
>>         }
>>     )
>>     order_producer.flush()
>>
>>
>> 2. Flink performs an SQL query on this stream and publishes it back to
>> Kafka topic that has 3 partitions. Below is the SQL code
>>
>> | SELECT
>> |   o.id,
>> |   COUNT(*),
>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>> | FROM
>> |   order o
>> | GROUP BY
>> |   o.id,
>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>
>> So I expect the sum of all the counts of the result to be equal to 1000
>> but it seems that a lot of messages are missing (797 as below). I can't
>> seem to figure out why though. I'm using event time for the environment
>>
>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>
>> *Below is the configuration code*
>> Here's the code for the consumer settings for Kafka
>>
>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>   val properties = new Properties()
>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>   properties.setProperty("group.id", "awesome_order")
>>
>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>     "order",
>>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>       classOf[Order],
>>       kafkaSchemaRegistry
>>     ),
>>     properties
>>   )
>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>   kafkaConsumer.setStartFromGroupOffsets()
>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>     WatermarkStrategy
>>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>         override def extractTimestamp(order: Order, recordTimestamp: Long): 
>> Long = {
>>           order.getTimestamp
>>         }
>>       })
>>   }
>>   kafkaConsumer
>> }
>>
>> Afterwards,
>> 1. I create a tempview from this source data stream
>> 2. perform SQL queries on it
>> 3. append it back to a processed datastream
>> 4. attach the stream to kafka sink
>>
>> Here's the code for the producer settings for Kafka
>>
>> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] 
>> = {
>>   val properties: Properties = new Properties()
>>   properties.put("bootstrap.servers", kafkaBrokers)
>>   properties.put("transaction.timeout.ms", "60000")
>>
>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>     "processed_model",
>>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>>       classOf[ProcessedModel],
>>       "procssed_model-value",
>>       kafkaSchemaRegistry
>>     ),
>>     properties,
>>     null,
>>     Semantic.EXACTLY_ONCE,
>>     5
>>   )
>>   kafkaProducer
>> }
>>
>>
>>
>> *Side Note*
>> Another interesting part is that, if I flush "after" publishing all events, 
>> the processed event doesn't even seem to arrive at the sink at all. The 
>> source is still populated in normally in Flink. It's as if there is no 
>> progress after the message arrived to source
>>
>> order_producer = get_order_producer()
>> current_timestamp = int(round(time() * 1000))
>> for i in range(0, 1000):
>>     order_producer.produce(
>>         topic="order",
>>         key={"key": i % 100},
>>         value={
>>             "id": 1000,
>>             "customerId": i % 10,
>>             "timestamp": current_timestamp + i * 100
>>         }
>>     )
>>     order_producer.flush()  # if I flush "AFTER" the loop, there is no 
>> processed data in the sink of Flink. event itself arrives without any 
>> problem in the source in Flink though
>>
>>

Reply via email to