Generalizing the pattern would be great. I was also wondering if there
aren't other commonalities between sources that would benefit from a shared
framework. Kafka and Kinesis don't look all that different from a consumer
perspective: replayable source, topic -> stream, partition -> shard, offset
-> sequence, dynamic discovery, state saving - shouldn't there be more
common code?

Meanwhile, we need to find a way to address shortcomings in the current
Kinesis connector to enable the use case. I would prefer to do that without
permanently forking the connector code, so here are some more thoughts:

   - Provide hook to override discovery (not to hit Kinesis from every
   subtask)
   - Provide hook to support custom watermark generation (somewhere around
   KinesisDataFetcher.emitRecordAndUpdateState)

If we can accomplish these in short order, it would be great. The current
implementation makes it hard/impossible to override certain behaviors
(final protected methods and the like). If there is agreement then I would
like to address those as a quick PR.

Thanks,
Thomas


On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> That last point is very valid. For a while now I've wanted to generalise
> the pattern of our file source to other sources. (This is related to how
> Beam sources are being refactored to use Splittable DoFn.)
>
> I'm very eager for design work to start on this once 1.5 is out the door.
> There are some other folks (cc'ed) who have also talked/thought about this
> before.
>
> Best,
> Aljoscha
>
> > On 7. Feb 2018, at 01:44, Thomas Weise <t...@apache.org> wrote:
> >
> > In addition to lack of watermark support, the Kinesis consumer suffers
> from
> > a discovery related issue that also needs to be resolved. Shard discovery
> > runs periodically in all subtasks. That's not just inefficient but
> becomes
> > a real problem when there is a large number of subtasks due to rate
> > limiting (
> > https://docs.aws.amazon.com/streams/latest/dev/service-
> sizes-and-limits.html).
> > The discovery interval should be minimized to cap latency (new shards not
> > consumed until discovered).
> >
> > How about moving discovery out of the fetcher into a separate singleton
> > source and then broadcast the result to the parallel fetchers, following
> > the pattern applied to file input?
> >
> > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4
> ef726ffe1b/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336
> >
> > This would also ensure that all subtasks consistently see the same shard
> > list.
> >
> > Thoughts?
> >
> > Thanks,
> > Thomas
> >
> >
> > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise <t...@apache.org> wrote:
> >
> >> Hi,
> >>
> >> The Kinesis consumer currently does not emit watermarks, and this can
> lead
> >> to problems when a single subtask reads from multiple shards and offsets
> >> are not closely aligned with respect to the event time.
> >>
> >> The Kafka consumer has support for periodic and punctuated watermarks,
> >> although there is also the unresolved issue https://issues.apache.org/
> >> jira/browse/FLINK-5479 that would equally apply for Kinesis.
> >>
> >> I propose adding support for timestamp assigner and watermark generator
> to
> >> the Kinesis consumer.
> >>
> >> As for handling of idle shards, is there a preference? Perhaps a
> >> customization point on the assigner that defers the decision to the user
> >> would be appropriate?
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
>
>

Reply via email to