Hi Dan,

could you check which records are missing? I'm suspecting it could be
records that are emitted right before roll over of the bucket strategy from
an otherwise idling partition.

If so it could be indeed connected to idleness. Idleness tells Flink to not
wait on the particular partition to advance watermark. If a record appears
in a previously idle partition with an event timestamp before the watermark
of the other partitions, that record would be deemed late and is discarded.

On Tue, Apr 27, 2021 at 2:42 AM Dan Hill <quietgol...@gmail.com> wrote:

> Hey Robert.
>
> Nothing weird.  I was trying to find recent records (not the latest).  No
> savepoints (just was running about ~1 day).  No checkpoint issues (all
> successes).  I don't know how many are missing.
>
> I removed the withIdleness. The other parts are very basic.  The text logs
> look pretty useless.
>
> On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Dan,
>>
>> Can you describe under which conditions you are missing records (after a
>> machine failure, after a Kafka failure, after taking and restoring from a
>> savepoint, ...).
>> Are many records missing? Are "the first records" or the "latest records"
>> missing? Any individual records missing, or larger blocks of data?
>>
>> I don't think that there's a bug in Flink or the Kafka connector. Maybe
>> its just a configuration or systems design issue.
>>
>>
>> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> Have any other devs noticed issues with Flink missing Kafka records with
>>> long-running Flink jobs?  When I re-run my Flink job and start from the
>>> earliest Kafka offset, Flink processes the events correctly.  I'm using
>>> Flink v1.11.1.
>>>
>>> I have a simple job that takes records (Requests) from Kafka and
>>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>>> working in a way that I'm not expecting.
>>>
>>> Any ideas?
>>> - Dan
>>>
>>>
>>> void createLogRequestJob(StreamExecutionEnvironment env) throws
>>> Exception {
>>>
>>>   Properties kafkaSourceProperties =
>>> getKafkaSourceProperties("logrequest");
>>>
>>>   SingleOutputStreamOperator<Request> rawRequestInput = env.addSource(
>>>
>>>     new FlinkKafkaConsumer(getInputRequestTopic(),
>>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>>
>>>       .uid("source-request")
>>>
>>>       .name("Request")
>>>
>>>       .assignTimestampsAndWatermarks(
>>>
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>>
>>>
>>>   executeLogRequest(rawRequestInput);
>>>
>>>   env.execute("log-request");
>>>
>>> }
>>>
>>>
>>> void executeLogRequest(SingleOutputStreamOperator<Request>
>>> rawRequestInput) {
>>>
>>>   AvroWriterFactory<Request> factory =
>>> getAvroWriterFactory(Request.class);
>>>
>>>   rawRequestInput.addSink(StreamingFileSink
>>>
>>>       .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>>> factory)
>>>
>>>       .withBucketAssigner(new DateHourBucketAssigner<Request>(request
>>> -> request.getTiming().getEventApiTimestamp()))
>>>
>>>       .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>
>>>       .withOutputFileConfig(createOutputFileConfig())
>>>
>>>       .build())
>>>
>>>     .uid("sink-s3-raw-request")
>>>
>>>     .name("S3 Raw Request");
>>>
>>> }
>>>
>>>
>>>
>>>

Reply via email to