Hi guys, I've been recently experimenting with end-to-end testing
environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka
broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event
increased by 100 milliseconds per loop (first message and last message has
a difference of 100 seconds). There are 3 partitions for the topic I'm
writing to. Below code is the test message producer using Confluent's
Python SDK

order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
    order_producer.produce(
        topic="order",
        key={"key": i % 100},
        value={
            "id": 1000,
            "customerId": i % 10,
            "timestamp": current_timestamp + i * 100
        }
    )
    order_producer.flush()


2. Flink performs an SQL query on this stream and publishes it back to
Kafka topic that has 3 partitions. Below is the SQL code

| SELECT
|   o.id,
|   COUNT(*),
|   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
|   order o
| GROUP BY
|   o.id,
|   TUMBLE(o.ts, INTERVAL '5' SECONDS)

So I expect the sum of all the counts of the result to be equal to 1000 but
it seems that a lot of messages are missing (797 as below). I can't seem to
figure out why though. I'm using event time for the environment

[image: Screenshot 2020-11-02 at 23.35.23.png]

*Below is the configuration code*
Here's the code for the consumer settings for Kafka

private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
  val properties = new Properties()
  properties.setProperty("bootstrap.servers", kafkaBrokers)
  properties.setProperty("group.id", "awesome_order")

  val kafkaConsumer = new FlinkKafkaConsumer[Order](
    "order",
    ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
      classOf[Order],
      kafkaSchemaRegistry
    ),
    properties
  )
  kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
  kafkaConsumer.setStartFromGroupOffsets()
  kafkaConsumer.assignTimestampsAndWatermarks {
    WatermarkStrategy
      .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
        override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
          order.getTimestamp
        }
      })
  }
  kafkaConsumer
}

Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka

private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
  val properties: Properties = new Properties()
  properties.put("bootstrap.servers", kafkaBrokers)
  properties.put("transaction.timeout.ms", "60000")

  val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
    "processed_model",
    ConfluentRegistryAvroSerializationSchema.forSpecific(
      classOf[ProcessedModel],
      "procssed_model-value",
      kafkaSchemaRegistry
    ),
    properties,
    null,
    Semantic.EXACTLY_ONCE,
    5
  )
  kafkaProducer
}



*Side Note*
Another interesting part is that, if I flush "after" publishing all
events, the processed event doesn't even seem to arrive at the sink at
all. The source is still populated in normally in Flink. It's as if
there is no progress after the message arrived to source

order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
    order_producer.produce(
        topic="order",
        key={"key": i % 100},
        value={
            "id": 1000,
            "customerId": i % 10,
            "timestamp": current_timestamp + i * 100
        }
    )
    order_producer.flush()  # if I flush "AFTER" the loop, there is no
processed data in the sink of Flink. event itself arrives without any
problem in the source in Flink though

Reply via email to