Hello there.

So we have some Postgres tables that are mutable, and we want to create a
snapshot of them in S3 every X minutes. So we plan to use Debezium to send a
CDC log of every row change into a Kafka topic, and then have Flink keep the
latest state of each row to save that data into S3 subsequently.

Our current job looks like this and works somehow well in most cases:

   // checkpoint interval is set to run every 10 minutes

    kafkaSource
      .keyB { it.id }
      .window(GlobalWindows.create())
      .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.minutes(5)))
      .reduce { left, right ->
        if (left.timestamp() > right.timestamp()) {
          left
        } else {
          right
        }
      }
      .addSink(StreamingFileSink
        .forBulkFormat(Path(outputDir),
ParquetAvroWriters.forGenericRecord(avroSchema))
       
.withBucketAssigner(DateTimeBucketAssignerr("'date='yyyy-MM-dd/'hour='HH/'minute='mm"))
        .build())

We use `GlobalWindows.create()` because we want to hold in Flink's state ALL
the changes send into Kafka (the reduce function, according to the docs,
will make sure to evict all events except the last one).

This works, but we know there could be some edge cases. For instance, if the
trigger fires around the same time that a checkpoint, we could get into a
position where StreamingFileSink rolls an incomplete set of all the events
triggered.

So a couple of questions:

1. Is there a way to mark the events with the timestamp of the trigger that
fired them?
2. Is the approach we took fine? (keep in mind that we will deal with giant
tables, so a batch job that queries them every N seconds is not an option).
3. Do you foresee any other edge cases?

Thanks for taking a look at this.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to