Hi everyone,

I'm failing to write a simple Flink 1.13.1 application with the DataStream that 
reads from kafka and writes to parquet.

My intention is to run this as a job that reads what is currenlty in kafka, 
shuts down when reaching current end of each partition and picks up from there 
next time it's started.

I've tried several variations, I can't get anything to work properly:

* In streaming mode, enabling checkpoint and setting the kafkaSource to bounded 
(see code sample below), the application fails to perform checkpoint 
complaining about:

"Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks of 
job ... has been finished"

=> no parquet part gets written, no checkpoint gets written and no kafka offset 
get committed
 
* In sreaming mode, with checkpoing but removing the `setBounded()` on the 
kafka source yields the same result

* I also tried in batch mode, removing the checkpoint, switching the 
StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" => 
in that case I'm getting some parquet output and kafka offsets are committed, 
but the application shuts down before flushing the offset of what has been 
read, s.t. the latest kafka events will be read again at the next start.

This all sounds very basic, I see other people do this kind of thing (recently, 
[1]), and II was really expecting the combinaision of KafkaSource with 
StreamingFileSink and checkpointing to work, all those are streaming concepts. 
Hopefully I'm doing something wrong?

[1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser

Thanks a lot in advance,

Svend

```
    // I'm testing this by launching the app an IDE

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new 
DynamicProtobufKryoSerializer(params));

    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    env.enableCheckpointing(1000);
    
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
    
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    KafkaSource<DynamicMessage> kafkaSource = 
KafkaSource.<DynamicMessage>builder()
        .setBootstrapServers("localhost:9092")
        .setTopics("some-topic")
        .setGroupId("my-group")
        .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema())
        
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .setBounded(OffsetsInitializer.latest())
        .build();

    StreamingFileSink<DynamicMessage> parquetSink = StreamingFileSink
        .forBulkFormat(
            Path.fromLocalFile(new File("/tmp/job-output/some-topic.parquet")),
            new ParquetWriterFactory<>((out) -> new 
ParquetDynamicMessageWriterBuilder(out, params).build()))
   .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .build();

    env
        .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka 
Source")
        .sinkTo(parquetSink);

    env.execute("my-job");
```

Reply via email to