Looks like the event time that I've specified in the consumer is not being
respected. Does the timestamp assigner actually work in Kafka consumers?

      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
        override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
          order.getTimestamp
        }
      })


On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fsw0...@gmail.com> wrote:

> Hi guys, I've been recently experimenting with end-to-end testing
> environment with Kafka and Flink (1.11)
>
> I've setup an infrastructure with Docker Compose composed of single Kafka
> broker / Flink (1.11) / MinIO for checkpoint saves
>
> Here's the test scenario
>
> 1. Send 1000 messages with manual timestamp assigned to each event
> increased by 100 milliseconds per loop (first message and last message has
> a difference of 100 seconds). There are 3 partitions for the topic I'm
> writing to. Below code is the test message producer using Confluent's
> Python SDK
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
>     order_producer.produce(
>         topic="order",
>         key={"key": i % 100},
>         value={
>             "id": 1000,
>             "customerId": i % 10,
>             "timestamp": current_timestamp + i * 100
>         }
>     )
>     order_producer.flush()
>
>
> 2. Flink performs an SQL query on this stream and publishes it back to
> Kafka topic that has 3 partitions. Below is the SQL code
>
> | SELECT
> |   o.id,
> |   COUNT(*),
> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
> | FROM
> |   order o
> | GROUP BY
> |   o.id,
> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>
> So I expect the sum of all the counts of the result to be equal to 1000
> but it seems that a lot of messages are missing (797 as below). I can't
> seem to figure out why though. I'm using event time for the environment
>
> [image: Screenshot 2020-11-02 at 23.35.23.png]
>
> *Below is the configuration code*
> Here's the code for the consumer settings for Kafka
>
> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>   val properties = new Properties()
>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>   properties.setProperty("group.id", "awesome_order")
>
>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>     "order",
>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>       classOf[Order],
>       kafkaSchemaRegistry
>     ),
>     properties
>   )
>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>   kafkaConsumer.setStartFromGroupOffsets()
>   kafkaConsumer.assignTimestampsAndWatermarks {
>     WatermarkStrategy
>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>         override def extractTimestamp(order: Order, recordTimestamp: Long): 
> Long = {
>           order.getTimestamp
>         }
>       })
>   }
>   kafkaConsumer
> }
>
> Afterwards,
> 1. I create a tempview from this source data stream
> 2. perform SQL queries on it
> 3. append it back to a processed datastream
> 4. attach the stream to kafka sink
>
> Here's the code for the producer settings for Kafka
>
> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] 
> = {
>   val properties: Properties = new Properties()
>   properties.put("bootstrap.servers", kafkaBrokers)
>   properties.put("transaction.timeout.ms", "60000")
>
>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>     "processed_model",
>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>       classOf[ProcessedModel],
>       "procssed_model-value",
>       kafkaSchemaRegistry
>     ),
>     properties,
>     null,
>     Semantic.EXACTLY_ONCE,
>     5
>   )
>   kafkaProducer
> }
>
>
>
> *Side Note*
> Another interesting part is that, if I flush "after" publishing all events, 
> the processed event doesn't even seem to arrive at the sink at all. The 
> source is still populated in normally in Flink. It's as if there is no 
> progress after the message arrived to source
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
>     order_producer.produce(
>         topic="order",
>         key={"key": i % 100},
>         value={
>             "id": 1000,
>             "customerId": i % 10,
>             "timestamp": current_timestamp + i * 100
>         }
>     )
>     order_producer.flush()  # if I flush "AFTER" the loop, there is no 
> processed data in the sink of Flink. event itself arrives without any problem 
> in the source in Flink though
>
>

Reply via email to