Hi Svend,
I'm a bit confused by this statement:

* In sreaming mode, with checkpoing but removing the `setBounded()` on the
> kafka source yields the same result


My expectation would be that the source runs forever, if it is not bounded.
Are you sure this error message is not coming from another task?


On Sat, Jul 31, 2021 at 11:23 AM Svend <stream...@svend.xyz> wrote:

> Hi everyone,
>
> I'm failing to write a simple Flink 1.13.1 application with the DataStream
> that reads from kafka and writes to parquet.
>
> My intention is to run this as a job that reads what is currenlty in
> kafka, shuts down when reaching current end of each partition and picks up
> from there next time it's started.
>
> I've tried several variations, I can't get anything to work properly:
>
> * In streaming mode, enabling checkpoint and setting the kafkaSource to
> bounded (see code sample below), the application fails to perform
> checkpoint complaining about:
>
> "Checkpoint Timer Failed to trigger checkpoint for job ... since some
> tasks of job ... has been finished"
>
> => no parquet part gets written, no checkpoint gets written and no kafka
> offset get committed
>
> * In sreaming mode, with checkpoing but removing the `setBounded()` on the
> kafka source yields the same result
>
> * I also tried in batch mode, removing the checkpoint, switching the
> StreamingFileSink for a FileSink and using Kafka's "
> auto.commit.interval.ms" => in that case I'm getting some parquet output
> and kafka offsets are committed, but the application shuts down before
> flushing the offset of what has been read, s.t. the latest kafka events
> will be read again at the next start.
>
> This all sounds very basic, I see other people do this kind of thing
> (recently, [1]), and II was really expecting the combinaision of
> KafkaSource with StreamingFileSink and checkpointing to work, all those are
> streaming concepts. Hopefully I'm doing something wrong?
>
> [1]
> http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser
>
> Thanks a lot in advance,
>
> Svend
>
> ```
>     // I'm testing this by launching the app an IDE
>
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class,
> new DynamicProtobufKryoSerializer(params));
>
>     env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>     env.enableCheckpointing(1000);
>
> env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>     KafkaSource<DynamicMessage> kafkaSource =
> KafkaSource.<DynamicMessage>builder()
>         .setBootstrapServers("localhost:9092")
>         .setTopics("some-topic")
>         .setGroupId("my-group")
>         .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema())
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>         .setBounded(OffsetsInitializer.latest())
>         .build();
>
>     StreamingFileSink<DynamicMessage> parquetSink = StreamingFileSink
>         .forBulkFormat(
>             Path.fromLocalFile(new
> File("/tmp/job-output/some-topic.parquet")),
>             new ParquetWriterFactory<>((out) -> new
> ParquetDynamicMessageWriterBuilder(out, params).build()))
>    .withRollingPolicy(OnCheckpointRollingPolicy.build())
>         .build();
>
>     env
>         .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka
> Source")
>         .sinkTo(parquetSink);
>
>     env.execute("my-job");
> ```
>

Reply via email to