Re: Timestamp/watermark support in Kinesis consumer
Another nice thing is that readers can potentially also read from different sources (historic/latest). To arrive at a general connector pattern, it will also be necessary to consider the ordering relationship between restrictions/splits/blocks/segments when it is important for the processing logic - which is what Jamie refers to. For Kinesis, the most obvious case is reading parent before child shards, but also throttling unrelated shards if they are unbalanced WRT event time. We are now implementing a stop gap watermark solution in custom code because the Kinesis consumer really needs a revamp as part of general connector overhaul. On Thu, Feb 22, 2018 at 2:28 AM, Aljoscha Krettek wrote: > While we're on this: https://beam.apache.org/blog/ > 2017/08/16/splittable-do-fn.html > > This is a concrete way of separating partition/shard/split discovery from > their reading. The nice thing about this is that you can mix-and-match > "discovery components" and "reader components". For example, for Kafka we > would have a TopicReader and I can envision different discovery > implementations: one very simple, no-frills, but rock solid, another one > that does automatic discovery of new partitions, regex matching, etc... > > > > On 22. Feb 2018, at 01:49, Jamie Grier wrote: > > > > I know this is a very simplistic idea but... > > > > In general the issue Eron is describing occurs whenever two (or more) > > parallel partitions are assigned to the same Flink sub-task and there is > > large time delta between them. This problem exists though largely > because > > we are not making any decisions about which of these partitions to read > and > > when but rather just treating them all the same. However, this isn't the > > only way to approach the problem. > > > > Think instead of each partition as a "roughly time sorted" file and the > > function of the connector as roughly a merge sort type process. In other > > words just read the older data first by peeking at each partition and > > deciding what to read next. The output of the connector would be a > roughly > > time ordered stream that way.. > > > > However to really solve the whole problem you'd have to carry this idea > > throughout Flink and be more selective about which data you read and when > > throughout the whole data flow graph. Similar problem I think and just > > something I've been thinking a bit about lately. > > > > > > > > > > On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright > wrote: > > > >> It is valuable to consider the behavior of a consumer in both a > real-time > >> processing context, which consists mostly of tail reads, and a > historical > >> processing context, where there's an abundance of backlogged data. In > the > >> historical processing context, system internals (e.g. shard selection > >> logic) have an outsized influence on the order of observation and > >> potentially the progression of the event time clock. In a real-time > >> context, the order of observation is, by definition, mostly determined > by > >> the order in which events are produced. > >> > >> My point is, it would be good to explore the efficacy of these > improvements > >> in both contexts. > >> > >> > >> > >> > >> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise wrote: > >> > >>> I don't think there is a generic solution to the problem you are > >>> describing; we don't know how long it will take for resharding to take > >>> effect and those changes to become visible to the connector. Depending > on > >>> how latency sensitive the pipeline is, possibly a configurable > watermark > >>> hold period could be used to cushion the event time chaos introduced by > >>> resharding. > >>> > >>> This isn't the primary motivation for the connector customization I'm > >>> working on though. We face issues with restart from older checkpoints > >> where > >>> parent and child shards are consumed in parallel. > >>> > >>> > >>> -- > >>> sent from mobile > >>> > >>> > >>> On Feb 12, 2018 4:36 PM, "Eron Wright" wrote: > >>> > >>> I'd like to know how you envision dealing with resharding in relation > to > >>> the watermark state. Imagine that a given shard S1 has a watermark of > >> T1, > >>> and is then split into two shards S2 and S3. The new shards are > >> assigned > >>> to subtasks according to a hash function. The current watermarks of > >> those > >>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be > >>> considered late. > >>> > >>> The problem of a chaotic event time clock is exacerbated by any source > >> that > >>> uses dynamic partitioning. Would a per-shard watermark generator > really > >>> solve the problem that is motivating you? > >>> > >>> Thanks, > >>> Eron > >>> > >>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise wrote: > >>> > Based on my draft implementation, the changes that are needed in the > >>> Flink > connector are as follows: > > I need to be able to override the following to track last record > >>> tim
Re: Timestamp/watermark support in Kinesis consumer
While we're on this: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html This is a concrete way of separating partition/shard/split discovery from their reading. The nice thing about this is that you can mix-and-match "discovery components" and "reader components". For example, for Kafka we would have a TopicReader and I can envision different discovery implementations: one very simple, no-frills, but rock solid, another one that does automatic discovery of new partitions, regex matching, etc... > On 22. Feb 2018, at 01:49, Jamie Grier wrote: > > I know this is a very simplistic idea but... > > In general the issue Eron is describing occurs whenever two (or more) > parallel partitions are assigned to the same Flink sub-task and there is > large time delta between them. This problem exists though largely because > we are not making any decisions about which of these partitions to read and > when but rather just treating them all the same. However, this isn't the > only way to approach the problem. > > Think instead of each partition as a "roughly time sorted" file and the > function of the connector as roughly a merge sort type process. In other > words just read the older data first by peeking at each partition and > deciding what to read next. The output of the connector would be a roughly > time ordered stream that way.. > > However to really solve the whole problem you'd have to carry this idea > throughout Flink and be more selective about which data you read and when > throughout the whole data flow graph. Similar problem I think and just > something I've been thinking a bit about lately. > > > > > On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright wrote: > >> It is valuable to consider the behavior of a consumer in both a real-time >> processing context, which consists mostly of tail reads, and a historical >> processing context, where there's an abundance of backlogged data. In the >> historical processing context, system internals (e.g. shard selection >> logic) have an outsized influence on the order of observation and >> potentially the progression of the event time clock. In a real-time >> context, the order of observation is, by definition, mostly determined by >> the order in which events are produced. >> >> My point is, it would be good to explore the efficacy of these improvements >> in both contexts. >> >> >> >> >> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise wrote: >> >>> I don't think there is a generic solution to the problem you are >>> describing; we don't know how long it will take for resharding to take >>> effect and those changes to become visible to the connector. Depending on >>> how latency sensitive the pipeline is, possibly a configurable watermark >>> hold period could be used to cushion the event time chaos introduced by >>> resharding. >>> >>> This isn't the primary motivation for the connector customization I'm >>> working on though. We face issues with restart from older checkpoints >> where >>> parent and child shards are consumed in parallel. >>> >>> >>> -- >>> sent from mobile >>> >>> >>> On Feb 12, 2018 4:36 PM, "Eron Wright" wrote: >>> >>> I'd like to know how you envision dealing with resharding in relation to >>> the watermark state. Imagine that a given shard S1 has a watermark of >> T1, >>> and is then split into two shards S2 and S3. The new shards are >> assigned >>> to subtasks according to a hash function. The current watermarks of >> those >>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be >>> considered late. >>> >>> The problem of a chaotic event time clock is exacerbated by any source >> that >>> uses dynamic partitioning. Would a per-shard watermark generator really >>> solve the problem that is motivating you? >>> >>> Thanks, >>> Eron >>> >>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise wrote: >>> Based on my draft implementation, the changes that are needed in the >>> Flink connector are as follows: I need to be able to override the following to track last record >>> timestamp and idle time per shard. protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber >> lastSequenceNumber) >>> { synchronized (checkpointLock) { sourceContext.collectWithTimestamp(record, >> recordTimestamp); updateState(shardStateIndex, lastSequenceNumber); } } Any objection removing final from it? Also, why is sourceContext.collectWithTimestamp in the synchronized >>> block? My custom class will need to emit watermarks - I assume there is no >> need >>> to acquire checkpointLock for that? Otherwise I would also need to add emitWatermark() to the base class. Let me know if anything else should be considered, I will open a JIRA >> and PR otherwise. Thanks, Thomas >>>
Re: Timestamp/watermark support in Kinesis consumer
I know this is a very simplistic idea but... In general the issue Eron is describing occurs whenever two (or more) parallel partitions are assigned to the same Flink sub-task and there is large time delta between them. This problem exists though largely because we are not making any decisions about which of these partitions to read and when but rather just treating them all the same. However, this isn't the only way to approach the problem. Think instead of each partition as a "roughly time sorted" file and the function of the connector as roughly a merge sort type process. In other words just read the older data first by peeking at each partition and deciding what to read next. The output of the connector would be a roughly time ordered stream that way.. However to really solve the whole problem you'd have to carry this idea throughout Flink and be more selective about which data you read and when throughout the whole data flow graph. Similar problem I think and just something I've been thinking a bit about lately. On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright wrote: > It is valuable to consider the behavior of a consumer in both a real-time > processing context, which consists mostly of tail reads, and a historical > processing context, where there's an abundance of backlogged data. In the > historical processing context, system internals (e.g. shard selection > logic) have an outsized influence on the order of observation and > potentially the progression of the event time clock. In a real-time > context, the order of observation is, by definition, mostly determined by > the order in which events are produced. > > My point is, it would be good to explore the efficacy of these improvements > in both contexts. > > > > > On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise wrote: > > > I don't think there is a generic solution to the problem you are > > describing; we don't know how long it will take for resharding to take > > effect and those changes to become visible to the connector. Depending on > > how latency sensitive the pipeline is, possibly a configurable watermark > > hold period could be used to cushion the event time chaos introduced by > > resharding. > > > > This isn't the primary motivation for the connector customization I'm > > working on though. We face issues with restart from older checkpoints > where > > parent and child shards are consumed in parallel. > > > > > > -- > > sent from mobile > > > > > > On Feb 12, 2018 4:36 PM, "Eron Wright" wrote: > > > > I'd like to know how you envision dealing with resharding in relation to > > the watermark state. Imagine that a given shard S1 has a watermark of > T1, > > and is then split into two shards S2 and S3. The new shards are > assigned > > to subtasks according to a hash function. The current watermarks of > those > > subtasks could be far ahead of T1, and thus the events in S2/S3 will be > > considered late. > > > > The problem of a chaotic event time clock is exacerbated by any source > that > > uses dynamic partitioning. Would a per-shard watermark generator really > > solve the problem that is motivating you? > > > > Thanks, > > Eron > > > > On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise wrote: > > > > > Based on my draft implementation, the changes that are needed in the > > Flink > > > connector are as follows: > > > > > > I need to be able to override the following to track last record > > timestamp > > > and idle time per shard. > > > > > > protected final void emitRecordAndUpdateState(T record, long > > > recordTimestamp, int shardStateIndex, SequenceNumber > lastSequenceNumber) > > { > > > synchronized (checkpointLock) { > > > sourceContext.collectWithTimestamp(record, > recordTimestamp); > > > updateState(shardStateIndex, lastSequenceNumber); > > > } > > > } > > > > > > Any objection removing final from it? > > > > > > Also, why is sourceContext.collectWithTimestamp in the synchronized > > block? > > > My custom class will need to emit watermarks - I assume there is no > need > > to > > > acquire checkpointLock for that? Otherwise I would also need to add > > > emitWatermark() to the base class. > > > > > > Let me know if anything else should be considered, I will open a JIRA > and > > > PR otherwise. > > > > > > Thanks, > > > Thomas > > > > > > > > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise wrote: > > > > > > > --> > > > > > > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai < > > tzuli...@apache.org > > > > > > > > wrote: > > > > > > > >> Regarding the two hooks you would like to be available: > > > >> > > > >> > > > >>- Provide hook to override discovery (not to hit Kinesis from > every > > > >>subtask) > > > >> > > > >> Yes, I think we can easily provide a way, for example setting -1 for > > > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > > > >> Though, the user would then have to savepoint and restore in order > to > >
Re: Timestamp/watermark support in Kinesis consumer
Big +1 on trying to come up with a common framework for partition-based, replayable sources. There is so much common code to be written that makes it possible to write correct connectors and Gordon's bullet points are exactly those -- and it's not just Kinesis and Kafka. It's also true for reading data out of something like S3. If your data is organized as a bunch of parallel, roughly time ordered files, there really isn't much difference in the kind of code you have to write for this for all the hard bits mentioned above. The good news is that the potential outcome of this sort of effort could be that production quality, correct, parallel connectors are much easier to implement. Ideally everything other than the code you write to discover partitions and the code to consume data from a single simple partition could be mostly common. On Thu, Feb 8, 2018 at 2:01 AM, Tzu-Li (Gordon) Tai wrote: > Hi Thomas, > > It’s great that you’ve brought out these issues, which IMO are all very > valid. They have also been in my head for a while. > > Here’s a list of things, out of the top of my head, that I would really > like to improve as part of a major Kafka / Kinesis connector rework. > Some have JIRAs for them already, or were discussed in some other > indirectly related JIRA. It might make sense to open an umbrella ticket and > consolidate all of them there. > > - Common abstraction for partition-based, replayable sources, which > handles 1) position checkpointing, 2) partition discovery / topic > subscription (using the file source pattern), 3) custom startup positions, > 4) per-partition watermarks, and 5) partition idleness. > - Configuration for the connectors are not well-defined. Some go through > provided properties, some requires using setter methods, etc. Moreover, it > is actually very confusing for some users that we share the Properties to > carry Flink connector-specific configurations, as well as the internally > used client configuration [1]. I think in this aspect, Beam’s KafkaIO has a > nice API [2] when it comes to this. > - Some default behaviors of the current connectors, such as partitioning > and flushing on the producer sides, and offset-committing for the Kafka > consumer, do not make sense [3] [4]. > - The deserialization / serialization schema together with the partitioner > interfaces don’t really place well together. For example, the > `getTargetTopic` method should really be part of the partitioner [5]. > > I think we are now in a good position to try making this happen for 1.6. > Once 1.5 is out of the way, I can try opening an umbrella JIRA and collect > everything there so we can discuss more there. > > Cheers, > Gordon > > [1] https://issues.apache.org/jira/browse/FLINK-4280? > focusedCommentId=15399648&page=com.atlassian.jira. > plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648 > [2] https://github.com/apache/beam/blob/master/sdks/java/io/ > kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171 > [3] https://issues.apache.org/jira/browse/FLINK-5728 > [4] https://issues.apache.org/jira/browse/FLINK-5704 > [5] https://issues.apache.org/jira/browse/FLINK-6288 > > On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote: > > Generalizing the pattern would be great. I was also wondering if there > aren't other commonalities between sources that would benefit from a shared > framework. Kafka and Kinesis don't look all that different from a consumer > perspective: replayable source, topic -> stream, partition -> shard, offset > -> sequence, dynamic discovery, state saving - shouldn't there be more > common code? > > Meanwhile, we need to find a way to address shortcomings in the current > Kinesis connector to enable the use case. I would prefer to do that without > permanently forking the connector code, so here are some more thoughts: > Provide hook to override discovery (not to hit Kinesis from every subtask) > Provide hook to support custom watermark generation (somewhere around > KinesisDataFetcher.emitRecordAndUpdateState) > If we can accomplish these in short order, it would be great. The current > implementation makes it hard/impossible to override certain behaviors > (final protected methods and the like). If there is agreement then I would > like to address those as a quick PR. > > Thanks, > Thomas > > > On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek > wrote: > Hi, > > That last point is very valid. For a while now I've wanted to generalise > the pattern of our file source to other sources. (This is related to how > Beam sources are being refactored to use Splittable DoFn.) > > I'm very eager for design work to start on this once 1.5 is out the door. > There are some other folks (cc'ed) who have also talked/thought about this > before. > > Best, > Aljoscha > > > On 7. Feb 2018, at 01:44, Thomas Weise wrote: > > > > In addition to lack of watermark support, the Kinesis consumer suffers > from > > a discovery related issue th
Re: Timestamp/watermark support in Kinesis consumer
It is valuable to consider the behavior of a consumer in both a real-time processing context, which consists mostly of tail reads, and a historical processing context, where there's an abundance of backlogged data. In the historical processing context, system internals (e.g. shard selection logic) have an outsized influence on the order of observation and potentially the progression of the event time clock. In a real-time context, the order of observation is, by definition, mostly determined by the order in which events are produced. My point is, it would be good to explore the efficacy of these improvements in both contexts. On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise wrote: > I don't think there is a generic solution to the problem you are > describing; we don't know how long it will take for resharding to take > effect and those changes to become visible to the connector. Depending on > how latency sensitive the pipeline is, possibly a configurable watermark > hold period could be used to cushion the event time chaos introduced by > resharding. > > This isn't the primary motivation for the connector customization I'm > working on though. We face issues with restart from older checkpoints where > parent and child shards are consumed in parallel. > > > -- > sent from mobile > > > On Feb 12, 2018 4:36 PM, "Eron Wright" wrote: > > I'd like to know how you envision dealing with resharding in relation to > the watermark state. Imagine that a given shard S1 has a watermark of T1, > and is then split into two shards S2 and S3. The new shards are assigned > to subtasks according to a hash function. The current watermarks of those > subtasks could be far ahead of T1, and thus the events in S2/S3 will be > considered late. > > The problem of a chaotic event time clock is exacerbated by any source that > uses dynamic partitioning. Would a per-shard watermark generator really > solve the problem that is motivating you? > > Thanks, > Eron > > On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise wrote: > > > Based on my draft implementation, the changes that are needed in the > Flink > > connector are as follows: > > > > I need to be able to override the following to track last record > timestamp > > and idle time per shard. > > > > protected final void emitRecordAndUpdateState(T record, long > > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) > { > > synchronized (checkpointLock) { > > sourceContext.collectWithTimestamp(record, recordTimestamp); > > updateState(shardStateIndex, lastSequenceNumber); > > } > > } > > > > Any objection removing final from it? > > > > Also, why is sourceContext.collectWithTimestamp in the synchronized > block? > > My custom class will need to emit watermarks - I assume there is no need > to > > acquire checkpointLock for that? Otherwise I would also need to add > > emitWatermark() to the base class. > > > > Let me know if anything else should be considered, I will open a JIRA and > > PR otherwise. > > > > Thanks, > > Thomas > > > > > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise wrote: > > > > > --> > > > > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai < > tzuli...@apache.org > > > > > > wrote: > > > > > >> Regarding the two hooks you would like to be available: > > >> > > >> > > >>- Provide hook to override discovery (not to hit Kinesis from every > > >>subtask) > > >> > > >> Yes, I think we can easily provide a way, for example setting -1 for > > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > > >> Though, the user would then have to savepoint and restore in order to > > >> pick up new shards after a Kinesis stream reshard (which is in > practice > > the > > >> best way to by-pass the Kinesis API rate limitations). > > >> +1 to provide that. > > >> > > > > > > I'm considering a customization of KinesisDataFetcher with override for > > > discoverNewShardsToSubscribe. We still want shards to be discovered, > just > > > not by hitting Kinesis from every subtask. > > > > > > > > >> > > >> > > >>- Provide hook to support custom watermark generation (somewhere > > >>around KinesisDataFetcher.emitRecordAndUpdateState) > > >> > > >> Per-partition watermark generation on the Kinesis side is slightly > more > > >> complex than Kafka, due to how Kinesis’s dynamic resharding works. > > >> I think we need to additionally allow new shards to be consumed only > > >> after its parent shard is fully read, otherwise “per-shard time > > >> characteristics” can be broken because of this out-of-orderness > > consumption > > >> across the boundaries of a closed parent shard and its child. > > >> There theses JIRAs [1][2] which has a bit more details on the topic. > > >> Otherwise, in general I’m also +1 to providing this also in the > Kinesis > > >> consumer. > > >> > > > > > > Here I'm thinking to customize emitRecordAndUpdateState (method would > > need > > > to be made non-f
Re: Timestamp/watermark support in Kinesis consumer
I don't think there is a generic solution to the problem you are describing; we don't know how long it will take for resharding to take effect and those changes to become visible to the connector. Depending on how latency sensitive the pipeline is, possibly a configurable watermark hold period could be used to cushion the event time chaos introduced by resharding. This isn't the primary motivation for the connector customization I'm working on though. We face issues with restart from older checkpoints where parent and child shards are consumed in parallel. -- sent from mobile On Feb 12, 2018 4:36 PM, "Eron Wright" wrote: I'd like to know how you envision dealing with resharding in relation to the watermark state. Imagine that a given shard S1 has a watermark of T1, and is then split into two shards S2 and S3. The new shards are assigned to subtasks according to a hash function. The current watermarks of those subtasks could be far ahead of T1, and thus the events in S2/S3 will be considered late. The problem of a chaotic event time clock is exacerbated by any source that uses dynamic partitioning. Would a per-shard watermark generator really solve the problem that is motivating you? Thanks, Eron On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise wrote: > Based on my draft implementation, the changes that are needed in the Flink > connector are as follows: > > I need to be able to override the following to track last record timestamp > and idle time per shard. > > protected final void emitRecordAndUpdateState(T record, long > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { > synchronized (checkpointLock) { > sourceContext.collectWithTimestamp(record, recordTimestamp); > updateState(shardStateIndex, lastSequenceNumber); > } > } > > Any objection removing final from it? > > Also, why is sourceContext.collectWithTimestamp in the synchronized block? > My custom class will need to emit watermarks - I assume there is no need to > acquire checkpointLock for that? Otherwise I would also need to add > emitWatermark() to the base class. > > Let me know if anything else should be considered, I will open a JIRA and > PR otherwise. > > Thanks, > Thomas > > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise wrote: > > > --> > > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai > > > wrote: > > > >> Regarding the two hooks you would like to be available: > >> > >> > >>- Provide hook to override discovery (not to hit Kinesis from every > >>subtask) > >> > >> Yes, I think we can easily provide a way, for example setting -1 for > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > >> Though, the user would then have to savepoint and restore in order to > >> pick up new shards after a Kinesis stream reshard (which is in practice > the > >> best way to by-pass the Kinesis API rate limitations). > >> +1 to provide that. > >> > > > > I'm considering a customization of KinesisDataFetcher with override for > > discoverNewShardsToSubscribe. We still want shards to be discovered, just > > not by hitting Kinesis from every subtask. > > > > > >> > >> > >>- Provide hook to support custom watermark generation (somewhere > >>around KinesisDataFetcher.emitRecordAndUpdateState) > >> > >> Per-partition watermark generation on the Kinesis side is slightly more > >> complex than Kafka, due to how Kinesis’s dynamic resharding works. > >> I think we need to additionally allow new shards to be consumed only > >> after its parent shard is fully read, otherwise “per-shard time > >> characteristics” can be broken because of this out-of-orderness > consumption > >> across the boundaries of a closed parent shard and its child. > >> There theses JIRAs [1][2] which has a bit more details on the topic. > >> Otherwise, in general I’m also +1 to providing this also in the Kinesis > >> consumer. > >> > > > > Here I'm thinking to customize emitRecordAndUpdateState (method would > need > > to be made non-final). Using getSubscribedShardsState with additional > > transient state to keep track of watermark per shard and emit watermark > as > > appropriate. > > > > That's the idea - haven't written any code for it yet. > > > > Thanks, > > Thomas > > > > >
Re: Timestamp/watermark support in Kinesis consumer
I'd like to know how you envision dealing with resharding in relation to the watermark state. Imagine that a given shard S1 has a watermark of T1, and is then split into two shards S2 and S3. The new shards are assigned to subtasks according to a hash function. The current watermarks of those subtasks could be far ahead of T1, and thus the events in S2/S3 will be considered late. The problem of a chaotic event time clock is exacerbated by any source that uses dynamic partitioning. Would a per-shard watermark generator really solve the problem that is motivating you? Thanks, Eron On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise wrote: > Based on my draft implementation, the changes that are needed in the Flink > connector are as follows: > > I need to be able to override the following to track last record timestamp > and idle time per shard. > > protected final void emitRecordAndUpdateState(T record, long > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { > synchronized (checkpointLock) { > sourceContext.collectWithTimestamp(record, recordTimestamp); > updateState(shardStateIndex, lastSequenceNumber); > } > } > > Any objection removing final from it? > > Also, why is sourceContext.collectWithTimestamp in the synchronized block? > My custom class will need to emit watermarks - I assume there is no need to > acquire checkpointLock for that? Otherwise I would also need to add > emitWatermark() to the base class. > > Let me know if anything else should be considered, I will open a JIRA and > PR otherwise. > > Thanks, > Thomas > > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise wrote: > > > --> > > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai > > > wrote: > > > >> Regarding the two hooks you would like to be available: > >> > >> > >>- Provide hook to override discovery (not to hit Kinesis from every > >>subtask) > >> > >> Yes, I think we can easily provide a way, for example setting -1 for > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > >> Though, the user would then have to savepoint and restore in order to > >> pick up new shards after a Kinesis stream reshard (which is in practice > the > >> best way to by-pass the Kinesis API rate limitations). > >> +1 to provide that. > >> > > > > I'm considering a customization of KinesisDataFetcher with override for > > discoverNewShardsToSubscribe. We still want shards to be discovered, just > > not by hitting Kinesis from every subtask. > > > > > >> > >> > >>- Provide hook to support custom watermark generation (somewhere > >>around KinesisDataFetcher.emitRecordAndUpdateState) > >> > >> Per-partition watermark generation on the Kinesis side is slightly more > >> complex than Kafka, due to how Kinesis’s dynamic resharding works. > >> I think we need to additionally allow new shards to be consumed only > >> after its parent shard is fully read, otherwise “per-shard time > >> characteristics” can be broken because of this out-of-orderness > consumption > >> across the boundaries of a closed parent shard and its child. > >> There theses JIRAs [1][2] which has a bit more details on the topic. > >> Otherwise, in general I’m also +1 to providing this also in the Kinesis > >> consumer. > >> > > > > Here I'm thinking to customize emitRecordAndUpdateState (method would > need > > to be made non-final). Using getSubscribedShardsState with additional > > transient state to keep track of watermark per shard and emit watermark > as > > appropriate. > > > > That's the idea - haven't written any code for it yet. > > > > Thanks, > > Thomas > > > > >
Re: Timestamp/watermark support in Kinesis consumer
Based on my draft implementation, the changes that are needed in the Flink connector are as follows: I need to be able to override the following to track last record timestamp and idle time per shard. protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collectWithTimestamp(record, recordTimestamp); updateState(shardStateIndex, lastSequenceNumber); } } Any objection removing final from it? Also, why is sourceContext.collectWithTimestamp in the synchronized block? My custom class will need to emit watermarks - I assume there is no need to acquire checkpointLock for that? Otherwise I would also need to add emitWatermark() to the base class. Let me know if anything else should be considered, I will open a JIRA and PR otherwise. Thanks, Thomas On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise wrote: > --> > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai > wrote: > >> Regarding the two hooks you would like to be available: >> >> >>- Provide hook to override discovery (not to hit Kinesis from every >>subtask) >> >> Yes, I think we can easily provide a way, for example setting -1 for >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. >> Though, the user would then have to savepoint and restore in order to >> pick up new shards after a Kinesis stream reshard (which is in practice the >> best way to by-pass the Kinesis API rate limitations). >> +1 to provide that. >> > > I'm considering a customization of KinesisDataFetcher with override for > discoverNewShardsToSubscribe. We still want shards to be discovered, just > not by hitting Kinesis from every subtask. > > >> >> >>- Provide hook to support custom watermark generation (somewhere >>around KinesisDataFetcher.emitRecordAndUpdateState) >> >> Per-partition watermark generation on the Kinesis side is slightly more >> complex than Kafka, due to how Kinesis’s dynamic resharding works. >> I think we need to additionally allow new shards to be consumed only >> after its parent shard is fully read, otherwise “per-shard time >> characteristics” can be broken because of this out-of-orderness consumption >> across the boundaries of a closed parent shard and its child. >> There theses JIRAs [1][2] which has a bit more details on the topic. >> Otherwise, in general I’m also +1 to providing this also in the Kinesis >> consumer. >> > > Here I'm thinking to customize emitRecordAndUpdateState (method would need > to be made non-final). Using getSubscribedShardsState with additional > transient state to keep track of watermark per shard and emit watermark as > appropriate. > > That's the idea - haven't written any code for it yet. > > Thanks, > Thomas > >
Re: Timestamp/watermark support in Kinesis consumer
--> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai wrote: > Regarding the two hooks you would like to be available: > > >- Provide hook to override discovery (not to hit Kinesis from every >subtask) > > Yes, I think we can easily provide a way, for example setting -1 for > SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > Though, the user would then have to savepoint and restore in order to pick > up new shards after a Kinesis stream reshard (which is in practice the best > way to by-pass the Kinesis API rate limitations). > +1 to provide that. > I'm considering a customization of KinesisDataFetcher with override for discoverNewShardsToSubscribe. We still want shards to be discovered, just not by hitting Kinesis from every subtask. > > >- Provide hook to support custom watermark generation (somewhere >around KinesisDataFetcher.emitRecordAndUpdateState) > > Per-partition watermark generation on the Kinesis side is slightly more > complex than Kafka, due to how Kinesis’s dynamic resharding works. > I think we need to additionally allow new shards to be consumed only after > its parent shard is fully read, otherwise “per-shard time characteristics” > can be broken because of this out-of-orderness consumption across the > boundaries of a closed parent shard and its child. > There theses JIRAs [1][2] which has a bit more details on the topic. > Otherwise, in general I’m also +1 to providing this also in the Kinesis > consumer. > Here I'm thinking to customize emitRecordAndUpdateState (method would need to be made non-final). Using getSubscribedShardsState with additional transient state to keep track of watermark per shard and emit watermark as appropriate. That's the idea - haven't written any code for it yet. Thanks, Thomas
Re: Timestamp/watermark support in Kinesis consumer
Regarding the two hooks you would like to be available: Provide hook to override discovery (not to hit Kinesis from every subtask) Yes, I think we can easily provide a way, for example setting -1 for SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. Though, the user would then have to savepoint and restore in order to pick up new shards after a Kinesis stream reshard (which is in practice the best way to by-pass the Kinesis API rate limitations). +1 to provide that. Provide hook to support custom watermark generation (somewhere around KinesisDataFetcher.emitRecordAndUpdateState) Per-partition watermark generation on the Kinesis side is slightly more complex than Kafka, due to how Kinesis’s dynamic resharding works. I think we need to additionally allow new shards to be consumed only after its parent shard is fully read, otherwise “per-shard time characteristics” can be broken because of this out-of-orderness consumption across the boundaries of a closed parent shard and its child. There theses JIRAs [1][2] which has a bit more details on the topic. Otherwise, in general I’m also +1 to providing this also in the Kinesis consumer. Cheers, Gordon [1] https://issues.apache.org/jira/browse/FLINK-5697 [2] https://issues.apache.org/jira/browse/FLINK-6349 On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote: Provide hook to override discovery (not to hit Kinesis from every subtask) Provide hook to support custom watermark generation (somewhere around KinesisDataFetcher.emitRecordAndUpdateState)
Re: Timestamp/watermark support in Kinesis consumer
Hi Thomas, It’s great that you’ve brought out these issues, which IMO are all very valid. They have also been in my head for a while. Here’s a list of things, out of the top of my head, that I would really like to improve as part of a major Kafka / Kinesis connector rework. Some have JIRAs for them already, or were discussed in some other indirectly related JIRA. It might make sense to open an umbrella ticket and consolidate all of them there. - Common abstraction for partition-based, replayable sources, which handles 1) position checkpointing, 2) partition discovery / topic subscription (using the file source pattern), 3) custom startup positions, 4) per-partition watermarks, and 5) partition idleness. - Configuration for the connectors are not well-defined. Some go through provided properties, some requires using setter methods, etc. Moreover, it is actually very confusing for some users that we share the Properties to carry Flink connector-specific configurations, as well as the internally used client configuration [1]. I think in this aspect, Beam’s KafkaIO has a nice API [2] when it comes to this. - Some default behaviors of the current connectors, such as partitioning and flushing on the producer sides, and offset-committing for the Kafka consumer, do not make sense [3] [4]. - The deserialization / serialization schema together with the partitioner interfaces don’t really place well together. For example, the `getTargetTopic` method should really be part of the partitioner [5]. I think we are now in a good position to try making this happen for 1.6. Once 1.5 is out of the way, I can try opening an umbrella JIRA and collect everything there so we can discuss more there. Cheers, Gordon [1] https://issues.apache.org/jira/browse/FLINK-4280?focusedCommentId=15399648&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648 [2] https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171 [3] https://issues.apache.org/jira/browse/FLINK-5728 [4] https://issues.apache.org/jira/browse/FLINK-5704 [5] https://issues.apache.org/jira/browse/FLINK-6288 On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote: Generalizing the pattern would be great. I was also wondering if there aren't other commonalities between sources that would benefit from a shared framework. Kafka and Kinesis don't look all that different from a consumer perspective: replayable source, topic -> stream, partition -> shard, offset -> sequence, dynamic discovery, state saving - shouldn't there be more common code? Meanwhile, we need to find a way to address shortcomings in the current Kinesis connector to enable the use case. I would prefer to do that without permanently forking the connector code, so here are some more thoughts: Provide hook to override discovery (not to hit Kinesis from every subtask) Provide hook to support custom watermark generation (somewhere around KinesisDataFetcher.emitRecordAndUpdateState) If we can accomplish these in short order, it would be great. The current implementation makes it hard/impossible to override certain behaviors (final protected methods and the like). If there is agreement then I would like to address those as a quick PR. Thanks, Thomas On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek wrote: Hi, That last point is very valid. For a while now I've wanted to generalise the pattern of our file source to other sources. (This is related to how Beam sources are being refactored to use Splittable DoFn.) I'm very eager for design work to start on this once 1.5 is out the door. There are some other folks (cc'ed) who have also talked/thought about this before. Best, Aljoscha > On 7. Feb 2018, at 01:44, Thomas Weise wrote: > > In addition to lack of watermark support, the Kinesis consumer suffers from > a discovery related issue that also needs to be resolved. Shard discovery > runs periodically in all subtasks. That's not just inefficient but becomes > a real problem when there is a large number of subtasks due to rate > limiting ( > https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). > The discovery interval should be minimized to cap latency (new shards not > consumed until discovered). > > How about moving discovery out of the fetcher into a separate singleton > source and then broadcast the result to the parallel fetchers, following > the pattern applied to file input? > > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336 > > This would also ensure that all subtasks consistently see the same shard > list. > > Thoughts? > > Thanks, > Thomas > > > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise wrote: > >> Hi, >> >> The Kinesis consumer currently does not emit watermarks, and this can le
Re: Timestamp/watermark support in Kinesis consumer
Generalizing the pattern would be great. I was also wondering if there aren't other commonalities between sources that would benefit from a shared framework. Kafka and Kinesis don't look all that different from a consumer perspective: replayable source, topic -> stream, partition -> shard, offset -> sequence, dynamic discovery, state saving - shouldn't there be more common code? Meanwhile, we need to find a way to address shortcomings in the current Kinesis connector to enable the use case. I would prefer to do that without permanently forking the connector code, so here are some more thoughts: - Provide hook to override discovery (not to hit Kinesis from every subtask) - Provide hook to support custom watermark generation (somewhere around KinesisDataFetcher.emitRecordAndUpdateState) If we can accomplish these in short order, it would be great. The current implementation makes it hard/impossible to override certain behaviors (final protected methods and the like). If there is agreement then I would like to address those as a quick PR. Thanks, Thomas On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek wrote: > Hi, > > That last point is very valid. For a while now I've wanted to generalise > the pattern of our file source to other sources. (This is related to how > Beam sources are being refactored to use Splittable DoFn.) > > I'm very eager for design work to start on this once 1.5 is out the door. > There are some other folks (cc'ed) who have also talked/thought about this > before. > > Best, > Aljoscha > > > On 7. Feb 2018, at 01:44, Thomas Weise wrote: > > > > In addition to lack of watermark support, the Kinesis consumer suffers > from > > a discovery related issue that also needs to be resolved. Shard discovery > > runs periodically in all subtasks. That's not just inefficient but > becomes > > a real problem when there is a large number of subtasks due to rate > > limiting ( > > https://docs.aws.amazon.com/streams/latest/dev/service- > sizes-and-limits.html). > > The discovery interval should be minimized to cap latency (new shards not > > consumed until discovered). > > > > How about moving discovery out of the fetcher into a separate singleton > > source and then broadcast the result to the parallel fetchers, following > > the pattern applied to file input? > > > > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4 > ef726ffe1b/flink-streaming-java/src/main/java/org/apache/ > flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336 > > > > This would also ensure that all subtasks consistently see the same shard > > list. > > > > Thoughts? > > > > Thanks, > > Thomas > > > > > > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise wrote: > > > >> Hi, > >> > >> The Kinesis consumer currently does not emit watermarks, and this can > lead > >> to problems when a single subtask reads from multiple shards and offsets > >> are not closely aligned with respect to the event time. > >> > >> The Kafka consumer has support for periodic and punctuated watermarks, > >> although there is also the unresolved issue https://issues.apache.org/ > >> jira/browse/FLINK-5479 that would equally apply for Kinesis. > >> > >> I propose adding support for timestamp assigner and watermark generator > to > >> the Kinesis consumer. > >> > >> As for handling of idle shards, is there a preference? Perhaps a > >> customization point on the assigner that defers the decision to the user > >> would be appropriate? > >> > >> Thanks, > >> Thomas > >> > >> > >
Re: Timestamp/watermark support in Kinesis consumer
Hi, That last point is very valid. For a while now I've wanted to generalise the pattern of our file source to other sources. (This is related to how Beam sources are being refactored to use Splittable DoFn.) I'm very eager for design work to start on this once 1.5 is out the door. There are some other folks (cc'ed) who have also talked/thought about this before. Best, Aljoscha > On 7. Feb 2018, at 01:44, Thomas Weise wrote: > > In addition to lack of watermark support, the Kinesis consumer suffers from > a discovery related issue that also needs to be resolved. Shard discovery > runs periodically in all subtasks. That's not just inefficient but becomes > a real problem when there is a large number of subtasks due to rate > limiting ( > https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). > The discovery interval should be minimized to cap latency (new shards not > consumed until discovered). > > How about moving discovery out of the fetcher into a separate singleton > source and then broadcast the result to the parallel fetchers, following > the pattern applied to file input? > > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336 > > This would also ensure that all subtasks consistently see the same shard > list. > > Thoughts? > > Thanks, > Thomas > > > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise wrote: > >> Hi, >> >> The Kinesis consumer currently does not emit watermarks, and this can lead >> to problems when a single subtask reads from multiple shards and offsets >> are not closely aligned with respect to the event time. >> >> The Kafka consumer has support for periodic and punctuated watermarks, >> although there is also the unresolved issue https://issues.apache.org/ >> jira/browse/FLINK-5479 that would equally apply for Kinesis. >> >> I propose adding support for timestamp assigner and watermark generator to >> the Kinesis consumer. >> >> As for handling of idle shards, is there a preference? Perhaps a >> customization point on the assigner that defers the decision to the user >> would be appropriate? >> >> Thanks, >> Thomas >> >>
Re: Timestamp/watermark support in Kinesis consumer
In addition to lack of watermark support, the Kinesis consumer suffers from a discovery related issue that also needs to be resolved. Shard discovery runs periodically in all subtasks. That's not just inefficient but becomes a real problem when there is a large number of subtasks due to rate limiting ( https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). The discovery interval should be minimized to cap latency (new shards not consumed until discovered). How about moving discovery out of the fetcher into a separate singleton source and then broadcast the result to the parallel fetchers, following the pattern applied to file input? https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336 This would also ensure that all subtasks consistently see the same shard list. Thoughts? Thanks, Thomas On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise wrote: > Hi, > > The Kinesis consumer currently does not emit watermarks, and this can lead > to problems when a single subtask reads from multiple shards and offsets > are not closely aligned with respect to the event time. > > The Kafka consumer has support for periodic and punctuated watermarks, > although there is also the unresolved issue https://issues.apache.org/ > jira/browse/FLINK-5479 that would equally apply for Kinesis. > > I propose adding support for timestamp assigner and watermark generator to > the Kinesis consumer. > > As for handling of idle shards, is there a preference? Perhaps a > customization point on the assigner that defers the decision to the user > would be appropriate? > > Thanks, > Thomas > >
Timestamp/watermark support in Kinesis consumer
Hi, The Kinesis consumer currently does not emit watermarks, and this can lead to problems when a single subtask reads from multiple shards and offsets are not closely aligned with respect to the event time. The Kafka consumer has support for periodic and punctuated watermarks, although there is also the unresolved issue https://issues.apache.org/jira/browse/FLINK-5479 that would equally apply for Kinesis. I propose adding support for timestamp assigner and watermark generator to the Kinesis consumer. As for handling of idle shards, is there a preference? Perhaps a customization point on the assigner that defers the decision to the user would be appropriate? Thanks, Thomas