Hi again, After a bit of experimentation (and actually reading the bug report I linked), I realized the issue was that the parallelism was higher than the number of Kafka partitions => reducing the parallelism enabled the checkpoints to work as expected.
=> since it seems unsupported, maybe KafkaSource should refuse to even start if its configured parallelism is higher than the kafka partitioning ? Otherwise, this error condition is rather difficult to interpret IMHO. I'm happy to open a jira and work on that if that's desired? Best regards, Svend On Wed, 4 Aug 2021, at 11:32 AM, Svend wrote: > Hi Robert, > > Thanks for the feed-back. > > You are correct, the behavior is indeed different: when I make the source > bounded, the application eventually stops whereas without that setting it > runs forever. > > In both cases neither checkpoints nor data is being written to the filesystem. > > I re-ran the experiment to get more info: > > * when making the kafka source unbounded (which is not what I want), I notice > log telling tasks associated to the Kafka source gets unregistered in the > beginning: > > 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source > -> Sink: Unnamed (3/8)#0 â Freeing task resources for Source: Kafka Source > -> Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d). > 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor > flink-akka.actor.default-dispatcher-6 â Un-registering task and sending > final execution state FINISHED to JobManager for task Source: Kafka Source -> > Sink: Unnamed (8/8)#0 f02843c05005daced3d5271832f25559. > > > Later on, it seems each checkpoint fails while complaining that some tasks > are not running => would that be caused by the finished tasks above? > > 2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator > Checkpoint Timer â Failed to trigger checkpoint for job > 321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job > 321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint > Failure reason: Not all required tasks are currently running. > > > * when setting the kafka source back to bounded, I also notice such > "unregistered" log message, before one "Checkpoint Timer â Failed to > trigger c" log, and then the job finishes > > > By digging a little this seems similar to this bug: > https://issues.apache.org/jira/browse/FLINK-2491 > > I've tried to add a supplementary ProcessFunction in the pipeline just to > have one more stateful thing and hope to trigger the checkpoint, though > without success. > > > > > On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote: >> Hi Svend, >> I'm a bit confused by this statement: >> >>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >>> kafka source yields the same result >> >> My expectation would be that the source runs forever, if it is not bounded. >> Are you sure this error message is not coming from another task? >> >> >> On Sat, Jul 31, 2021 at 11:23 AM Svend <stream...@svend.xyz> wrote: >>> __ >>> Hi everyone, >>> >>> I'm failing to write a simple Flink 1.13.1 application with the DataStream >>> that reads from kafka and writes to parquet. >>> >>> My intention is to run this as a job that reads what is currenlty in kafka, >>> shuts down when reaching current end of each partition and picks up from >>> there next time it's started. >>> >>> I've tried several variations, I can't get anything to work properly: >>> >>> * In streaming mode, enabling checkpoint and setting the kafkaSource to >>> bounded (see code sample below), the application fails to perform >>> checkpoint complaining about: >>> >>> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks >>> of job ... has been finished" >>> >>> => no parquet part gets written, no checkpoint gets written and no kafka >>> offset get committed >>> >>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >>> kafka source yields the same result >>> >>> * I also tried in batch mode, removing the checkpoint, switching the >>> StreamingFileSink for a FileSink and using Kafka's >>> "auto.commit.interval.ms" => in that case I'm getting some parquet output >>> and kafka offsets are committed, but the application shuts down before >>> flushing the offset of what has been read, s.t. the latest kafka events >>> will be read again at the next start. >>> >>> This all sounds very basic, I see other people do this kind of thing >>> (recently, [1]), and II was really expecting the combinaision of >>> KafkaSource with StreamingFileSink and checkpointing to work, all those are >>> streaming concepts. Hopefully I'm doing something wrong? >>> >>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser >>> >>> Thanks a lot in advance, >>> >>> Svend >>> >>> ``` >>> // I'm testing this by launching the app an IDE >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, >>> new DynamicProtobufKryoSerializer(params)); >>> >>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>> env.enableCheckpointing(1000); >>> >>> env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints"); >>> >>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>> >>> KafkaSource<DynamicMessage> kafkaSource = >>> KafkaSource.<DynamicMessage>builder() >>> .setBootstrapServers("localhost:9092") >>> .setTopics("some-topic") >>> .setGroupId("my-group") >>> .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema()) >>> >>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >>> .setBounded(OffsetsInitializer.latest()) >>> .build(); >>> >>> StreamingFileSink<DynamicMessage> parquetSink = StreamingFileSink >>> .forBulkFormat( >>> Path.fromLocalFile(new >>> File("/tmp/job-output/some-topic.parquet")), >>> new ParquetWriterFactory<>((out) -> new >>> ParquetDynamicMessageWriterBuilder(out, params).build())) >>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>> .build(); >>> >>> env >>> .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka >>> Source") >>> .sinkTo(parquetSink); >>> >>> env.execute("my-job"); >>> ``` >