Hi again,

After a bit of experimentation (and actually reading the bug report I linked), 
I realized the issue was that the parallelism was higher than the number of 
Kafka partitions => reducing the parallelism enabled the checkpoints to work as 
expected.

=> since it seems unsupported, maybe KafkaSource should refuse to even start if 
its configured parallelism is higher than the kafka partitioning ? Otherwise, 
this error condition is rather difficult to interpret IMHO. I'm happy to open a 
jira and work on that if that's desired?

Best regards,

Svend



On Wed, 4 Aug 2021, at 11:32 AM, Svend wrote:
> Hi Robert, 
> 
> Thanks for the feed-back.
> 
> You are correct, the behavior is indeed different: when I make the source 
> bounded, the application eventually stops whereas without that setting it 
> runs forever.
> 
> In both cases neither checkpoints nor data is being written to the filesystem.
> 
> I re-ran the experiment to get more info:
> 
> * when making the kafka source unbounded (which is not what I want), I notice 
> log telling tasks associated to the Kafka source gets unregistered in the 
> beginning:
> 
> 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source 
> -> Sink: Unnamed (3/8)#0 – Freeing task resources for Source: Kafka Source 
> -> Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d).
> 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor 
> flink-akka.actor.default-dispatcher-6 – Un-registering task and sending 
> final execution state FINISHED to JobManager for task Source: Kafka Source -> 
> Sink: Unnamed (8/8)#0 f02843c05005daced3d5271832f25559.
> 
> 
> Later on, it seems each checkpoint fails while complaining that some tasks 
> are not running => would that be caused by the finished tasks above?
> 
> 2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator 
> Checkpoint Timer – Failed to trigger checkpoint for job 
> 321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job 
> 321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint 
> Failure reason: Not all required tasks are currently running.
> 
> 
> * when setting the kafka source back to bounded, I also notice such 
> "unregistered" log message, before one "Checkpoint Timer – Failed to 
> trigger c" log, and then the job finishes
> 
> 
> By digging a little this seems similar to this bug: 
> https://issues.apache.org/jira/browse/FLINK-2491
> 
> I've tried to add a supplementary ProcessFunction in the pipeline just to 
> have one more stateful thing and hope to trigger the checkpoint, though 
> without success. 
> 
> 
> 
> 
> On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote:
>> Hi Svend,
>> I'm a bit confused by this statement:
>> 
>>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>>> kafka source yields the same result
>> 
>> My expectation would be that the source runs forever, if it is not bounded.
>> Are you sure this error message is not coming from another task?
>> 
>> 
>> On Sat, Jul 31, 2021 at 11:23 AM Svend <stream...@svend.xyz> wrote:
>>> __
>>> Hi everyone,
>>> 
>>> I'm failing to write a simple Flink 1.13.1 application with the DataStream 
>>> that reads from kafka and writes to parquet.
>>> 
>>> My intention is to run this as a job that reads what is currenlty in kafka, 
>>> shuts down when reaching current end of each partition and picks up from 
>>> there next time it's started.
>>> 
>>> I've tried several variations, I can't get anything to work properly:
>>> 
>>> * In streaming mode, enabling checkpoint and setting the kafkaSource to 
>>> bounded (see code sample below), the application fails to perform 
>>> checkpoint complaining about:
>>> 
>>> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks 
>>> of job ... has been finished"
>>> 
>>> => no parquet part gets written, no checkpoint gets written and no kafka 
>>> offset get committed
>>>  
>>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>>> kafka source yields the same result
>>> 
>>> * I also tried in batch mode, removing the checkpoint, switching the 
>>> StreamingFileSink for a FileSink and using Kafka's 
>>> "auto.commit.interval.ms" => in that case I'm getting some parquet output 
>>> and kafka offsets are committed, but the application shuts down before 
>>> flushing the offset of what has been read, s.t. the latest kafka events 
>>> will be read again at the next start.
>>> 
>>> This all sounds very basic, I see other people do this kind of thing 
>>> (recently, [1]), and II was really expecting the combinaision of 
>>> KafkaSource with StreamingFileSink and checkpointing to work, all those are 
>>> streaming concepts. Hopefully I'm doing something wrong?
>>> 
>>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser
>>> 
>>> Thanks a lot in advance,
>>> 
>>> Svend
>>> 
>>> ```
>>>     // I'm testing this by launching the app an IDE
>>> 
>>>     StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, 
>>> new DynamicProtobufKryoSerializer(params));
>>> 
>>>     env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>     env.enableCheckpointing(1000);
>>>     
>>> env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
>>>     
>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>> 
>>>     KafkaSource<DynamicMessage> kafkaSource = 
>>> KafkaSource.<DynamicMessage>builder()
>>>         .setBootstrapServers("localhost:9092")
>>>         .setTopics("some-topic")
>>>         .setGroupId("my-group")
>>>         .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema())
>>>         
>>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>         .setBounded(OffsetsInitializer.latest())
>>>         .build();
>>> 
>>>     StreamingFileSink<DynamicMessage> parquetSink = StreamingFileSink
>>>         .forBulkFormat(
>>>             Path.fromLocalFile(new 
>>> File("/tmp/job-output/some-topic.parquet")),
>>>             new ParquetWriterFactory<>((out) -> new 
>>> ParquetDynamicMessageWriterBuilder(out, params).build()))
>>>    .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>         .build();
>>> 
>>>     env
>>>         .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka 
>>> Source")
>>>         .sinkTo(parquetSink);
>>> 
>>>     env.execute("my-job");
>>> ```
> 

Reply via email to