Hey Robert.

Nothing weird.  I was trying to find recent records (not the latest).  No
savepoints (just was running about ~1 day).  No checkpoint issues (all
successes).  I don't know how many are missing.

I removed the withIdleness. The other parts are very basic.  The text logs
look pretty useless.

On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger <rmetz...@apache.org> wrote:

> Hi Dan,
>
> Can you describe under which conditions you are missing records (after a
> machine failure, after a Kafka failure, after taking and restoring from a
> savepoint, ...).
> Are many records missing? Are "the first records" or the "latest records"
> missing? Any individual records missing, or larger blocks of data?
>
> I don't think that there's a bug in Flink or the Kafka connector. Maybe
> its just a configuration or systems design issue.
>
>
> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Hi!
>>
>> Have any other devs noticed issues with Flink missing Kafka records with
>> long-running Flink jobs?  When I re-run my Flink job and start from the
>> earliest Kafka offset, Flink processes the events correctly.  I'm using
>> Flink v1.11.1.
>>
>> I have a simple job that takes records (Requests) from Kafka and
>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>> working in a way that I'm not expecting.
>>
>> Any ideas?
>> - Dan
>>
>>
>> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception
>> {
>>
>>   Properties kafkaSourceProperties =
>> getKafkaSourceProperties("logrequest");
>>
>>   SingleOutputStreamOperator<Request> rawRequestInput = env.addSource(
>>
>>     new FlinkKafkaConsumer(getInputRequestTopic(),
>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>
>>       .uid("source-request")
>>
>>       .name("Request")
>>
>>       .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>   executeLogRequest(rawRequestInput);
>>
>>   env.execute("log-request");
>>
>> }
>>
>>
>> void executeLogRequest(SingleOutputStreamOperator<Request>
>> rawRequestInput) {
>>
>>   AvroWriterFactory<Request> factory =
>> getAvroWriterFactory(Request.class);
>>
>>   rawRequestInput.addSink(StreamingFileSink
>>
>>       .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>> factory)
>>
>>       .withBucketAssigner(new DateHourBucketAssigner<Request>(request ->
>> request.getTiming().getEventApiTimestamp()))
>>
>>       .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>
>>       .withOutputFileConfig(createOutputFileConfig())
>>
>>       .build())
>>
>>     .uid("sink-s3-raw-request")
>>
>>     .name("S3 Raw Request");
>>
>> }
>>
>>
>>
>>

Reply via email to