JVM cluster not firing event time window

2021-11-09 Thread Carlos Downey
Hello,

Recently, I've decided to migrate one of my toy projects to Flink 1.14.0
and I realized that JVM cluster behaviour changed. It no longer respects
event time. On Flink 1.12.5 I didn't experience any issues. I tried some
debugging and it seems that InternalTimerServiceImpl's watermark is not
incremented at all. I don't see any issues with my code when running it on
a cluster.

I have provided a code sample that shows the issue. Just put numbers 1 to 10 to
a local Kafka cluster topic numbers. Try compiling it with Flink 1.12.5 /
1.13.3 / 1.14.0 (note: in recent Flink versions KafkaRecordDeserializer
 became KafkaRecordDeserializationSchema).

object Example extends App {
  import org.apache.flink.api.scala._
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val kafkaSource = KafkaSource.builder[Int]()
.setTopics("numbers")
.setBootstrapServers("localhost:9092")
.setGroupId("flink-job")
.setDeserializer(new KafkaRecordDeserializationSchema[Int] {
  override def deserialize(record: ConsumerRecord[Array[Byte],
Array[Byte]], out: Collector[Int]): Unit = {
out.collect(new String(record.value()).toInt)
  }

  override def getProducedType: TypeInformation[Int] =
TypeInformation.of(classOf[Int])
})
.build()

  env.fromSource(kafkaSource, new WatermarkStrategy[Int] {
  override def createWatermarkGenerator(context:
WatermarkGeneratorSupplier.Context): WatermarkGenerator[Int] = new
AscendingTimestampsWatermarks[Int]

  override def createTimestampAssigner(context:
TimestampAssignerSupplier.Context): TimestampAssigner[Int] = {
(element: Int, _: Long) => element
  }
}, "kafka-source")
.map(Integer.valueOf(_))
.keyBy(_ => 1)
.window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
.process(new ProcessWindowFunction[Integer, Int, Int, TimeWindow] {
  override def process(key: Int, context: Context, elements:
Iterable[Integer], out: Collector[Int]): Unit = {
out.collect(elements.last)
  }
})
.print()
  env.execute()
}

I see no output in Flink 1.13.3 and 1.14.0. The issue doesn't happen with
`fromElements` source -> I concluded that for finite sources all windows
are eventually fired. I wonder what could be causing this - maybe I just
missed some mandatory configuration?


HDFS streaming source concerns

2022-04-06 Thread Carlos Downey
Hi,

We have an in-house platform that we want to integrate with external
clients via HDFS. They have lots of existing files and they continuously
put more data to HDFS. Ideally, we would like to have a Flink job that
takes care of ingesting data as one of the requirements is to execute SQL
on top of these files. We looked at existing FileSource implementation but
we believe this will not be well suited for this use case.
- firstly, we'd have to ingest all files initially present on HDFS before
completing first checkpoint - this is unacceptable for us as we would have
to reprocess all the files again in case of early job failure. Not to
mention the state blowing up for aggregations.
- secondly, we see now way to establish valid watermark strategy. This is a
major pain point that we can't find the right answer for. We don't want to
assume too much about the data itself. In general, the only solutions we
see require some sort of synchronization across subtasks. On the other
hand, the simplest strategy is to delay the watermark. In that case though
we are afraid of accidentally dropping events.

Given this, we think about implementing our own file source, have someone
in the community already tried solving similar problem? If not, any
suggestions about the concerns we raised would be valuable.