Hello,

Recently, I've decided to migrate one of my toy projects to Flink 1.14.0
and I realized that JVM cluster behaviour changed. It no longer respects
event time. On Flink 1.12.5 I didn't experience any issues. I tried some
debugging and it seems that InternalTimerServiceImpl's watermark is not
incremented at all. I don't see any issues with my code when running it on
a cluster.

I have provided a code sample that shows the issue. Just put numbers 1 to 10 to
a local Kafka cluster topic numbers. Try compiling it with Flink 1.12.5 /
1.13.3 / 1.14.0 (note: in recent Flink versions KafkaRecordDeserializer
 became KafkaRecordDeserializationSchema).

object Example extends App {
  import org.apache.flink.api.scala._
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val kafkaSource = KafkaSource.builder[Int]()
    .setTopics("numbers")
    .setBootstrapServers("localhost:9092")
    .setGroupId("flink-job")
    .setDeserializer(new KafkaRecordDeserializationSchema[Int] {
      override def deserialize(record: ConsumerRecord[Array[Byte],
Array[Byte]], out: Collector[Int]): Unit = {
        out.collect(new String(record.value()).toInt)
      }

      override def getProducedType: TypeInformation[Int] =
TypeInformation.of(classOf[Int])
    })
    .build()

  env.fromSource(kafkaSource, new WatermarkStrategy[Int] {
      override def createWatermarkGenerator(context:
WatermarkGeneratorSupplier.Context): WatermarkGenerator[Int] = new
AscendingTimestampsWatermarks[Int]

      override def createTimestampAssigner(context:
TimestampAssignerSupplier.Context): TimestampAssigner[Int] = {
        (element: Int, _: Long) => element
      }
    }, "kafka-source")
    .map(Integer.valueOf(_))
    .keyBy(_ => 1)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
    .process(new ProcessWindowFunction[Integer, Int, Int, TimeWindow] {
      override def process(key: Int, context: Context, elements:
Iterable[Integer], out: Collector[Int]): Unit = {
        out.collect(elements.last)
      }
    })
    .print()
  env.execute()
}

I see no output in Flink 1.13.3 and 1.14.0. The issue doesn't happen with
`fromElements` source -> I concluded that for finite sources all windows
are eventually fired. I wonder what could be causing this - maybe I just
missed some mandatory configuration?

Reply via email to