Looks like the event time that I've specified in the consumer is not being
respected. Does the timestamp assigner actually work in Kafka consumers?
.withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
order.getTimestamp
}
})
On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <[email protected]> wrote:
> Hi guys, I've been recently experimenting with end-to-end testing
> environment with Kafka and Flink (1.11)
>
> I've setup an infrastructure with Docker Compose composed of single Kafka
> broker / Flink (1.11) / MinIO for checkpoint saves
>
> Here's the test scenario
>
> 1. Send 1000 messages with manual timestamp assigned to each event
> increased by 100 milliseconds per loop (first message and last message has
> a difference of 100 seconds). There are 3 partitions for the topic I'm
> writing to. Below code is the test message producer using Confluent's
> Python SDK
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
> order_producer.produce(
> topic="order",
> key={"key": i % 100},
> value={
> "id": 1000,
> "customerId": i % 10,
> "timestamp": current_timestamp + i * 100
> }
> )
> order_producer.flush()
>
>
> 2. Flink performs an SQL query on this stream and publishes it back to
> Kafka topic that has 3 partitions. Below is the SQL code
>
> | SELECT
> | o.id,
> | COUNT(*),
> | TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
> | FROM
> | order o
> | GROUP BY
> | o.id,
> | TUMBLE(o.ts, INTERVAL '5' SECONDS)
>
> So I expect the sum of all the counts of the result to be equal to 1000
> but it seems that a lot of messages are missing (797 as below). I can't
> seem to figure out why though. I'm using event time for the environment
>
> [image: Screenshot 2020-11-02 at 23.35.23.png]
>
> *Below is the configuration code*
> Here's the code for the consumer settings for Kafka
>
> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", kafkaBrokers)
> properties.setProperty("group.id", "awesome_order")
>
> val kafkaConsumer = new FlinkKafkaConsumer[Order](
> "order",
> ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
> classOf[Order],
> kafkaSchemaRegistry
> ),
> properties
> )
> kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
> kafkaConsumer.setStartFromGroupOffsets()
> kafkaConsumer.assignTimestampsAndWatermarks {
> WatermarkStrategy
> .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
> .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
> override def extractTimestamp(order: Order, recordTimestamp: Long):
> Long = {
> order.getTimestamp
> }
> })
> }
> kafkaConsumer
> }
>
> Afterwards,
> 1. I create a tempview from this source data stream
> 2. perform SQL queries on it
> 3. append it back to a processed datastream
> 4. attach the stream to kafka sink
>
> Here's the code for the producer settings for Kafka
>
> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel]
> = {
> val properties: Properties = new Properties()
> properties.put("bootstrap.servers", kafkaBrokers)
> properties.put("transaction.timeout.ms", "60000")
>
> val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
> "processed_model",
> ConfluentRegistryAvroSerializationSchema.forSpecific(
> classOf[ProcessedModel],
> "procssed_model-value",
> kafkaSchemaRegistry
> ),
> properties,
> null,
> Semantic.EXACTLY_ONCE,
> 5
> )
> kafkaProducer
> }
>
>
>
> *Side Note*
> Another interesting part is that, if I flush "after" publishing all events,
> the processed event doesn't even seem to arrive at the sink at all. The
> source is still populated in normally in Flink. It's as if there is no
> progress after the message arrived to source
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
> order_producer.produce(
> topic="order",
> key={"key": i % 100},
> value={
> "id": 1000,
> "customerId": i % 10,
> "timestamp": current_timestamp + i * 100
> }
> )
> order_producer.flush() # if I flush "AFTER" the loop, there is no
> processed data in the sink of Flink. event itself arrives without any problem
> in the source in Flink though
>
>