Hey Robert. Nothing weird. I was trying to find recent records (not the latest). No savepoints (just was running about ~1 day). No checkpoint issues (all successes). I don't know how many are missing.
I removed the withIdleness. The other parts are very basic. The text logs look pretty useless. On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger <rmetz...@apache.org> wrote: > Hi Dan, > > Can you describe under which conditions you are missing records (after a > machine failure, after a Kafka failure, after taking and restoring from a > savepoint, ...). > Are many records missing? Are "the first records" or the "latest records" > missing? Any individual records missing, or larger blocks of data? > > I don't think that there's a bug in Flink or the Kafka connector. Maybe > its just a configuration or systems design issue. > > > On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote: > >> Hi! >> >> Have any other devs noticed issues with Flink missing Kafka records with >> long-running Flink jobs? When I re-run my Flink job and start from the >> earliest Kafka offset, Flink processes the events correctly. I'm using >> Flink v1.11.1. >> >> I have a simple job that takes records (Requests) from Kafka and >> serializes them to S3. Pretty basic. No related issues in the text logs. >> I'm hoping I just have a configuration issue. I'm guessing idleness is >> working in a way that I'm not expecting. >> >> Any ideas? >> - Dan >> >> >> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception >> { >> >> Properties kafkaSourceProperties = >> getKafkaSourceProperties("logrequest"); >> >> SingleOutputStreamOperator<Request> rawRequestInput = env.addSource( >> >> new FlinkKafkaConsumer(getInputRequestTopic(), >> getProtoDeserializationSchema(Request.class), kafkaSourceProperties)) >> >> .uid("source-request") >> >> .name("Request") >> >> .assignTimestampsAndWatermarks( >> >> >> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1))); >> >> >> executeLogRequest(rawRequestInput); >> >> env.execute("log-request"); >> >> } >> >> >> void executeLogRequest(SingleOutputStreamOperator<Request> >> rawRequestInput) { >> >> AvroWriterFactory<Request> factory = >> getAvroWriterFactory(Request.class); >> >> rawRequestInput.addSink(StreamingFileSink >> >> .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"), >> factory) >> >> .withBucketAssigner(new DateHourBucketAssigner<Request>(request -> >> request.getTiming().getEventApiTimestamp())) >> >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> >> .withOutputFileConfig(createOutputFileConfig()) >> >> .build()) >> >> .uid("sink-s3-raw-request") >> >> .name("S3 Raw Request"); >> >> } >> >> >> >>