Hi Kevin, thanks a lot for posting this problem. I'm adding Jark to the thread, he or another committer working on Flink SQL can maybe provide some insights.
On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fsw0...@gmail.com> wrote: > Looks like the event time that I've specified in the consumer is not being > respected. Does the timestamp assigner actually work in Kafka consumers? > > .withTimestampAssigner(new SerializableTimestampAssigner[Order] { > override def extractTimestamp(order: Order, recordTimestamp: Long): > Long = { > order.getTimestamp > } > }) > > > On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote: > >> Hi guys, I've been recently experimenting with end-to-end testing >> environment with Kafka and Flink (1.11) >> >> I've setup an infrastructure with Docker Compose composed of single Kafka >> broker / Flink (1.11) / MinIO for checkpoint saves >> >> Here's the test scenario >> >> 1. Send 1000 messages with manual timestamp assigned to each event >> increased by 100 milliseconds per loop (first message and last message has >> a difference of 100 seconds). There are 3 partitions for the topic I'm >> writing to. Below code is the test message producer using Confluent's >> Python SDK >> >> order_producer = get_order_producer() >> current_timestamp = int(round(time() * 1000)) >> for i in range(0, 1000): >> order_producer.produce( >> topic="order", >> key={"key": i % 100}, >> value={ >> "id": 1000, >> "customerId": i % 10, >> "timestamp": current_timestamp + i * 100 >> } >> ) >> order_producer.flush() >> >> >> 2. Flink performs an SQL query on this stream and publishes it back to >> Kafka topic that has 3 partitions. Below is the SQL code >> >> | SELECT >> | o.id, >> | COUNT(*), >> | TUMBLE_END(o.ts, INTERVAL '5' SECONDS) >> | FROM >> | order o >> | GROUP BY >> | o.id, >> | TUMBLE(o.ts, INTERVAL '5' SECONDS) >> >> So I expect the sum of all the counts of the result to be equal to 1000 >> but it seems that a lot of messages are missing (797 as below). I can't >> seem to figure out why though. I'm using event time for the environment >> >> [image: Screenshot 2020-11-02 at 23.35.23.png] >> >> *Below is the configuration code* >> Here's the code for the consumer settings for Kafka >> >> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = { >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", kafkaBrokers) >> properties.setProperty("group.id", "awesome_order") >> >> val kafkaConsumer = new FlinkKafkaConsumer[Order]( >> "order", >> ConfluentRegistryAvroDeserializationSchema.forSpecific[Order]( >> classOf[Order], >> kafkaSchemaRegistry >> ), >> properties >> ) >> kafkaConsumer.setCommitOffsetsOnCheckpoints(true) >> kafkaConsumer.setStartFromGroupOffsets() >> kafkaConsumer.assignTimestampsAndWatermarks { >> WatermarkStrategy >> .forBoundedOutOfOrderness[Order](Duration.ofMillis(100)) >> .withTimestampAssigner(new SerializableTimestampAssigner[Order] { >> override def extractTimestamp(order: Order, recordTimestamp: Long): >> Long = { >> order.getTimestamp >> } >> }) >> } >> kafkaConsumer >> } >> >> Afterwards, >> 1. I create a tempview from this source data stream >> 2. perform SQL queries on it >> 3. append it back to a processed datastream >> 4. attach the stream to kafka sink >> >> Here's the code for the producer settings for Kafka >> >> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] >> = { >> val properties: Properties = new Properties() >> properties.put("bootstrap.servers", kafkaBrokers) >> properties.put("transaction.timeout.ms", "60000") >> >> val kafkaProducer = new FlinkKafkaProducer[ProcessedModel]( >> "processed_model", >> ConfluentRegistryAvroSerializationSchema.forSpecific( >> classOf[ProcessedModel], >> "procssed_model-value", >> kafkaSchemaRegistry >> ), >> properties, >> null, >> Semantic.EXACTLY_ONCE, >> 5 >> ) >> kafkaProducer >> } >> >> >> >> *Side Note* >> Another interesting part is that, if I flush "after" publishing all events, >> the processed event doesn't even seem to arrive at the sink at all. The >> source is still populated in normally in Flink. It's as if there is no >> progress after the message arrived to source >> >> order_producer = get_order_producer() >> current_timestamp = int(round(time() * 1000)) >> for i in range(0, 1000): >> order_producer.produce( >> topic="order", >> key={"key": i % 100}, >> value={ >> "id": 1000, >> "customerId": i % 10, >> "timestamp": current_timestamp + i * 100 >> } >> ) >> order_producer.flush() # if I flush "AFTER" the loop, there is no >> processed data in the sink of Flink. event itself arrives without any >> problem in the source in Flink though >> >>