Thank you for your reply,

Some clarification:

We have configured the BucketAssigner to use the *Kafka record timestamp*.
Exact bucketing behavior as follows:
private static final DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH");

@Override
public String getBucketId(KafkaRecord record, BucketAssigner.Context context)
{
return String.format(
"%s/dt=%s/partition_%s",
record.getTopic(),
Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format
(formatter),
record.getPartition());
}

For each record, we write only its offset to the S3 object as a sanity
check. It is easy to detect missing or duplicate offsets. To answer your
questions:


*Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02are
entirely skipped?*
No, because even if the producer were idle during these datetimes, we would
expect no missing offsets. We observed both *millions of missing records*,
in addition to missing partitions (2019-11-24T01 and 2019-11-24T02).
Further, the producer was very active during this time.
I want to emphasize that we noticed that the consumer for this exact
TopicPartition was falling behind (>1 hour lag); this degree of lag was
only observed for this partition. (The consumer eventually caught up). It's
normal for the consumer to fall behind the producer for short bursts, but
we definitely do not expect missing records as a result. There were
millions of records whose timestamps fall into (dt 2019-11-24T01 and
2019-11-24T02) - they were entirely skipped by the writer.


*what does TT stand for?*
It's simply convention for datetime serialization as string.



*Can it be that there are a lot of events for partition 4 that fill up2
part files for that duration?*
We are using the BulkWriter. I am under the impression that this writer
should only produce one file per checkpoint interval, which we have
configured to be 5 minutes. You see that the preceding commits follow this
pattern of one commit per checkpoint interval, which is what we expect.
It's very strange that two files for the same TopicPartition (same
TaskManager) are committed.


I am eager to hear your reply and understand what we're seeing.

Thanks,
Harrison

On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Harrison,
>
> One thing to keep in mind is that Flink will only write files if there
> is data to write. If, for example, your partition is not active for a
> period of time, then no files will be written.
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
>
> In addition, for the "duplicates", it would help if you could share a
> bit more information about your BucketAssigner.
> How are these names assigned to the files and what does TT stand for?
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration? I am
> asking because the counter of the 2 part files differ.
>
> Cheers,
> Kostas
>
> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <h...@quora.com> wrote:
> >
> > Hello,
> >
> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka
> 0.10.1.1) arbitrarily skipping data.
> >
> > Context
> > KafkaConnector010 is used as source, and
> StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate
> operators. Recently, we noticed that millions of Kafka records were missing
> for one topic partition (this job is running for 100+ topic partitions, and
> such behavior was only observed for one). This job is run on YARN, and
> hosts were healthy with no hardware faults observed. No exceptions in
> jobmanager or taskmanager logs at this time.
> >
> > How was this detected?
> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate
> location in S3, and have monitoring to ensure that written offsets are
> contiguous with no duplicates.
> > Each Kafka record is bucketed into hourly datetime partitions (UTC) in
> S3.
> >
> > (Condensed) Taskmanager logs
> > 2019-11-24 02:36:50,140 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252
> with MPU ID 3XG...
> > 2019-11-24 02:41:27,966 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253
> with MPU ID 9MW...
> > 2019-11-24 02:46:29,153 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254
> with MPU ID 7AP...
> > 2019-11-24 02:51:32,602 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255
> with MPU ID xQU...
> > 2019-11-24 02:56:35,183 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256
> with MPU ID pDL...
> > 2019-11-24 03:01:26,059 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257
> with MPU ID Itf...
> > 2019-11-24 03:01:26,510 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263
> with MPU ID e3l...
> > 2019-11-24 03:06:26,230 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264
> with MPU ID 5z4...
> > 2019-11-24 03:11:22,711 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265
> with MPU ID NfP...
> >
> > Two observations stand out from the above logs:
> > - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped,
> resulting in millions of missing offsets. They are never written in future
> commits (and data in S3 shows this).
> > - Two commits for the same topic partition ("digest_features", partition
> 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit
> interval being set at 5 minutes. Why was the same TopicPartition read from
> and committed twice in such a short interval?
> >
> > Would greatly appreciate if anyone is able to shed light on this issue.
> Happy to provide full logs if needed.
> > Thanks
> >
> >
> >
> >
> >
> >
> >
> >
>

Reply via email to