Re: Questions about implementing a flink source

2021-06-09 Thread Arvid Heise
Hi Evan,

1. I'd recommend supporting DeserializationSchema in any case similar to
KafkaRecordDeserializationSchema.
First, it aligns with other sources and user expectations.
Second, it's a tad faster and the plan looks easier if you omit a chained
task.
Third, you can avoid quite a bit of boilerplate code on user side by having
adapters such that a user can use any existing Flink DeserializationSchema
to deserialize the payload; so without writing any UDF in 80% of the use
cases, the user gets the value that he wants (see
KafkaValueOnlyDeserializationSchemaWrapper).
Lastly, we also plan to have first class support for invalid record
handling at some point and it might be connected to DeserializationSchema.

2. It's any reassignment while there is still data flowing in the execution
graph. It's always a matter if there are parallel roads from source to
sink. As long as there is an old record on the road, sending new records on
a different road has always the potential of new record overtaking old
record.
If you could drain all data (currently not possible) without restarting,
then dynamic reassignment would be safe.

Note that without backpressure, it would certainly be enough to wait a
couple of seconds after unassigning a partition before reassigning it to
avoid any reordering issue. Maybe you could offer a configuration option
and the user has to take some responsibility.

I could also see that we could piggyback on aligned checkpoint barriers to
not emit any data until the checkpoint has been completed and do the
reassignment then. But that's certainly something that the framework should
support and that you don't want to implement on your own.

3. Yes if you throw an IOException (or any other exception), the checkpoint
would not complete and the task gets restarted (could be in an inconsistent
state).

On Tue, Jun 8, 2021 at 10:51 PM Evan Palmer  wrote:

> Hello again,
>
> Thank you for all of your help so far, I have a few more questions if you
> have the time :)
>
> 1. Deserialization Schema
>
> There's been some debate within my team about whether we should offer a
> DeserializationSchema and SerializationSchema in our source and sink.
>
> If we include don't include the schemas, our source and sink would be
> implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>,
> which is the type our client library currently returns (this type is
> serializable), and users could transform the messages in a map function
> after the source. This would make implementing the source somewhat easier,
> and it doesn't seem like it would be much more difficult for users. On the
> other hand, I looked around and didn't find any flink sources implemented
> without a deserialization/serialization schema, so I'm worried that this
> choice might make our source/sink confusing for users, or that we're
> missing something. What are your thoughts on this?
>
> 2. Order aware rebalancing.
>
> I want to make sure I understand the problem with rebalancing partitions
> to different SourceReaders. Does any reassignment of a pub/sub
> partition between SourceReaders have the potential to cause disorder, or
> can order be guaranteed by some variant of ensuring that the partition is
> assigned to only one source reader at a time?
>
> I read through
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows,
> which made me think that if the user wanted a pipeline like
>
> env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)
>
> Then if two different source tasks had messages from a single pub/sub
> partition, there could be disorder. We're not planning to implement any
> rebalancing of partitions in our source, but I wanted to make sure I can
> document this correctly :)
>
> 3. Reporting permanent failures in the Sink
>
> Is it sufficient to throw an exception from Committer.commit() in the case
> where our sink has permanently failed in some way (e.g. the configured
> topic has been deleted, or the user doesn't have permissions to publish),
> or is there something else we should be doing?
>
> Evan
>
>
> On Mon, May 10, 2021 at 9:57 AM Arvid Heise  wrote:
>
>> Hi Evan,
>>
>> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>>> where this connector should live. I saw that there's already a pubsub
>>> connector in
>>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>>> so if flink is willing to host it, perhaps it could live near there?
>>> Alternatively, it could live alongside our client library in
>>> https://github.com/googleapis/java-pubsublite.
>>>
>>
>> For a long time, the community has been thinking of moving (most)
>> connectors out of the repository. Especially now with the new source/sink
>> interface, the need to decouple Flink release cycle and connector release
>> cycle is bigger 

Re: Questions about implementing a flink source

2021-06-08 Thread Evan Palmer
Hello again,

Thank you for all of your help so far, I have a few more questions if you
have the time :)

1. Deserialization Schema

There's been some debate within my team about whether we should offer a
DeserializationSchema and SerializationSchema in our source and sink.

If we include don't include the schemas, our source and sink would be
implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>,
which is the type our client library currently returns (this type is
serializable), and users could transform the messages in a map function
after the source. This would make implementing the source somewhat easier,
and it doesn't seem like it would be much more difficult for users. On the
other hand, I looked around and didn't find any flink sources implemented
without a deserialization/serialization schema, so I'm worried that this
choice might make our source/sink confusing for users, or that we're
missing something. What are your thoughts on this?

2. Order aware rebalancing.

I want to make sure I understand the problem with rebalancing partitions to
different SourceReaders. Does any reassignment of a pub/sub
partition between SourceReaders have the potential to cause disorder, or
can order be guaranteed by some variant of ensuring that the partition is
assigned to only one source reader at a time?

I read through
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows,
which made me think that if the user wanted a pipeline like

env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)

Then if two different source tasks had messages from a single pub/sub
partition, there could be disorder. We're not planning to implement any
rebalancing of partitions in our source, but I wanted to make sure I can
document this correctly :)

3. Reporting permanent failures in the Sink

Is it sufficient to throw an exception from Committer.commit() in the case
where our sink has permanently failed in some way (e.g. the configured
topic has been deleted, or the user doesn't have permissions to publish),
or is there something else we should be doing?

Evan


On Mon, May 10, 2021 at 9:57 AM Arvid Heise  wrote:

> Hi Evan,
>
> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>> where this connector should live. I saw that there's already a pubsub
>> connector in
>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>> so if flink is willing to host it, perhaps it could live near there?
>> Alternatively, it could live alongside our client library in
>> https://github.com/googleapis/java-pubsublite.
>>
>
> For a long time, the community has been thinking of moving (most)
> connectors out of the repository. Especially now with the new source/sink
> interface, the need to decouple Flink release cycle and connector release
> cycle is bigger than ever as we do not backport features in our bugfix
> branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many
> users would need to wait up to a year to effectively use the source
> (adaption of new Flink versions is usually slow).
> Therefore, I'd definitely encourage you to have the connector along your
> client library - where the release cycles probably also much better align.
> I will soon present an idea on how to list all available connectors on
> Flink's connector page such that from a user's perspective, it wouldn't
> matter if it's internal and external. If it turns out that the community
> rather wants to have all connectors still in the main repo, we can look at
> contributing it at a later point in time.
>

Okay, thanks for the context! We will host the connector in our repository.

>
> I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
>> It seems like these base implementations are mostly designed to help in
>> cases where the client library uses a synchronous pull based approach. Our
>> client library is async - we use a bidirectional stream to pull
>> messages from our brokers and we have some flow control settings to limit
>> the number of bytes and messages outstanding to the client. I'm wondering
>> if because of this, we should just implement the SourceReader interface. In
>> particular, we have a per partition subscriber class which buffers messages
>> up to the flow control limit and exposes an API almost identical to
>> SourceReader's pollNext and IsAvailable. What do you think?
>>
>
> Good catch. Yes, the implementation is more or less simulating the async
> fetching that your library apparently offers already. So feel free to skip
> it. Of course, if it turns out that you still need certain building blocks,
> such as record handover, we can also discuss pulling up a common base class
> to the async sources and the
>
SingleThreadMultiplexSourceReaderBase.
>
>> Ah, okay, this helped a lot. I'd 

Re: Questions about implementing a flink source

2021-05-10 Thread Arvid Heise
Hi Evan,

A few replies / questions inline. Somewhat relatedly, I'm also wondering
> where this connector should live. I saw that there's already a pubsub
> connector in
> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
> so if flink is willing to host it, perhaps it could live near there?
> Alternatively, it could live alongside our client library in
> https://github.com/googleapis/java-pubsublite.
>

For a long time, the community has been thinking of moving (most)
connectors out of the repository. Especially now with the new source/sink
interface, the need to decouple Flink release cycle and connector release
cycle is bigger than ever as we do not backport features in our bugfix
branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many
users would need to wait up to a year to effectively use the source
(adaption of new Flink versions is usually slow).
Therefore, I'd definitely encourage you to have the connector along your
client library - where the release cycles probably also much better align.
I will soon present an idea on how to list all available connectors on
Flink's connector page such that from a user's perspective, it wouldn't
matter if it's internal and external. If it turns out that the community
rather wants to have all connectors still in the main repo, we can look at
contributing it at a later point in time.

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
> It seems like these base implementations are mostly designed to help in
> cases where the client library uses a synchronous pull based approach. Our
> client library is async - we use a bidirectional stream to pull
> messages from our brokers and we have some flow control settings to limit
> the number of bytes and messages outstanding to the client. I'm wondering
> if because of this, we should just implement the SourceReader interface. In
> particular, we have a per partition subscriber class which buffers messages
> up to the flow control limit and exposes an API almost identical to
> SourceReader's pollNext and IsAvailable. What do you think?
>

Good catch. Yes, the implementation is more or less simulating the async
fetching that your library apparently offers already. So feel free to skip
it. Of course, if it turns out that you still need certain building blocks,
such as record handover, we can also discuss pulling up a common base class
to the async sources and the SingleThreadMultiplexSourceReaderBase.

> Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would
> break ordering guarantees, so when I read through the Kafka source, I was
> really confused by the lack of rebalancing.


We have some ideas on how to make it more dynamic but they are very far
down the road and we can hopefully implement them in a transparent way to
the sources.

On Fri, May 7, 2021 at 11:23 PM Evan Palmer  wrote:

> Hi Arvid, thank you so much for the detailed reply!
>
> A few replies / questions inline. Somewhat relatedly, I'm also wondering
> where this connector should live. I saw that there's already a pubsub
> connector in
> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
> so if flink is willing to host it, perhaps it could live near there?
> Alternatively, it could live alongside our client library in
> https://github.com/googleapis/java-pubsublite.
>
> On Mon, May 3, 2021 at 1:54 PM Arvid Heise  wrote:
>
>> Hi Evan,
>>
>> 1) You are absolutely correct that we would urge users to add new sources
>> as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
>> For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource
>> [2] as a starting point. Especially basing the reader implementation on
>> SingleThreadMultiplexSourceReaderBase will give you some performance boost
>> over naive implementations.
>> It is probably initially overwhelming but there is lots of thought behind
>> the Source interface. We plan on having better documentation and more
>> examples in the next months to ease the ramp up but it's also kind of a
>> hen-egg problem.
>>
>
> Okay, great, the Source interface seems much easier to work with. I
> haven't gotten around to thinking about our Sink yet, but I'm sure I'll
> have some questions when I do :)
>
> I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
> It seems like these base implementations are mostly designed to help in
> cases where the client library uses a synchronous pull based approach. Our
> client library is async - we use a bidirectional stream to pull
> messages from our brokers and we have some flow control settings to limit
> the number of bytes and messages outstanding to the client. I'm wondering
> if because of this, 

Re: Questions about implementing a flink source

2021-05-07 Thread Evan Palmer
Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering
where this connector should live. I saw that there's already a pubsub
connector in
https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
so if flink is willing to host it, perhaps it could live near there?
Alternatively, it could live alongside our client library in
https://github.com/googleapis/java-pubsublite.

On Mon, May 3, 2021 at 1:54 PM Arvid Heise  wrote:

> Hi Evan,
>
> 1) You are absolutely correct that we would urge users to add new sources
> as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
> For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource
> [2] as a starting point. Especially basing the reader implementation on
> SingleThreadMultiplexSourceReaderBase will give you some performance boost
> over naive implementations.
> It is probably initially overwhelming but there is lots of thought behind
> the Source interface. We plan on having better documentation and more
> examples in the next months to ease the ramp up but it's also kind of a
> hen-egg problem.
>

Okay, great, the Source interface seems much easier to work with. I haven't
gotten around to thinking about our Sink yet, but I'm sure I'll have some
questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
It seems like these base implementations are mostly designed to help in
cases where the client library uses a synchronous pull based approach. Our
client library is async - we use a bidirectional stream to pull
messages from our brokers and we have some flow control settings to limit
the number of bytes and messages outstanding to the client. I'm wondering
if because of this, we should just implement the SourceReader interface. In
particular, we have a per partition subscriber class which buffers messages
up to the flow control limit and exposes an API almost identical to
SourceReader's pollNext and IsAvailable. What do you think?

>
> I can also provide guidance outside of the ML if it's easier.
>
> 2) You are right, the currentParallelism is static in respect to the
> creation of the SourceReaders. Any change to the parallelism would also
> cause a recreation of the readers.
> Splits are usually checkpointed alongside the readers. On recovery, the
> readers are restored with their old splits. Only when splits cannot be
> recovered in the context of a reader (for example downscaling), the splits
> would be re-added to the enumerator.
>

> Rebalancing can happen in SplitEnumerator#addReader or
> #handleSplitRequest. The Kafka and File source use even different
> approaches with eager and lazy initialization respectively. Further, you
> can send arbitrary events between the enumerator and readers to work out
> the rebalancing. In theory, you can also dynamically rebalance splits,
> however, you lose ordering guarantees of the messages at the moment (if you
> have records r1, r2 in this order in split s and you reassign s, then you
> may end up with r2, r1 in the sink).
>

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would
break ordering guarantees, so when I read through the Kafka source, I was
really confused by the lack of rebalancing.

>
> [1]
> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
> [2]
> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99
>
> On Mon, May 3, 2021 at 1:40 AM Evan Palmer  wrote:
>
>> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
>> which is a partition based Pub/Sub product, and I have a few questions.
>>
>> 1.
>>
>> I saw that there are two sets of interfaces used in existing sources: The
>> RichSourceFunction, and the set of interfaces from FLIP-27. It seems like
>> the Source interfaces are preferred for new sources, but I wanted to be
>> sure.
>>
>> 2.
>>
>> I’m having a little bit of trouble working out how when the
>> currentParallelism returned by the SplitEnumeratorContext [1] can change,
>> and how a source should react to that.
>>
>> For context, I’m currently thinking about single partitions as “splits”,
>> so a source would have an approximately constant number of splits which
>> each has an potentially unbounded amount of work (at least in continuous
>> mode). Each split will be assigned to some SourceReader by the split
>> enumerator. If the value of currentParallelism changes, it seems like I’ll
>> need to find a way to redistribute my partitions over SourceReaders, or
>> else I'll end up with an unbalanced distribution of 

Re: Questions about implementing a flink source

2021-05-03 Thread Arvid Heise
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources
as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2]
as a starting point. Especially basing the reader implementation on
SingleThreadMultiplexSourceReaderBase will give you some performance boost
over naive implementations.
It is probably initially overwhelming but there is lots of thought behind
the Source interface. We plan on having better documentation and more
examples in the next months to ease the ramp up but it's also kind of a
hen-egg problem.

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the
creation of the SourceReaders. Any change to the parallelism would also
cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the
readers are restored with their old splits. Only when splits cannot be
recovered in the context of a reader (for example downscaling), the splits
would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest.
The Kafka and File source use even different approaches with eager and lazy
initialization respectively. Further, you can send arbitrary events between
the enumerator and readers to work out the rebalancing. In theory, you can
also dynamically rebalance splits, however, you lose ordering guarantees of
the messages at the moment (if you have records r1, r2 in this order in
split s and you reassign s, then you may end up with r2, r1 in the sink).

[1]
https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
[2]
https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99

On Mon, May 3, 2021 at 1:40 AM Evan Palmer  wrote:

> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
> which is a partition based Pub/Sub product, and I have a few questions.
>
> 1.
>
> I saw that there are two sets of interfaces used in existing sources: The
> RichSourceFunction, and the set of interfaces from FLIP-27. It seems like
> the Source interfaces are preferred for new sources, but I wanted to be
> sure.
>
> 2.
>
> I’m having a little bit of trouble working out how when the
> currentParallelism returned by the SplitEnumeratorContext [1] can change,
> and how a source should react to that.
>
> For context, I’m currently thinking about single partitions as “splits”,
> so a source would have an approximately constant number of splits which
> each has an potentially unbounded amount of work (at least in continuous
> mode). Each split will be assigned to some SourceReader by the split
> enumerator. If the value of currentParallelism changes, it seems like I’ll
> need to find a way to redistribute my partitions over SourceReaders, or
> else I'll end up with an unbalanced distribution of partitions to
> SourceReaders.
>
> I looked at the docs on elastic scaling [2], and it seems like when the
> parallelism of the source changes, the source will be checkpointed and
> restored. I think this would mean all the SourceReaders get restarted, and
> their splits are returned to the SplitEnumerator for reassignment. Is this
> approximately correct?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/
>
>


Questions about implementing a flink source

2021-05-02 Thread Evan Palmer
Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
which is a partition based Pub/Sub product, and I have a few questions.

1.

I saw that there are two sets of interfaces used in existing sources: The
RichSourceFunction, and the set of interfaces from FLIP-27. It seems like
the Source interfaces are preferred for new sources, but I wanted to be
sure.

2.

I’m having a little bit of trouble working out how when the
currentParallelism returned by the SplitEnumeratorContext [1] can change,
and how a source should react to that.

For context, I’m currently thinking about single partitions as “splits”, so
a source would have an approximately constant number of splits which each
has an potentially unbounded amount of work (at least in continuous mode).
Each split will be assigned to some SourceReader by the split enumerator.
If the value of currentParallelism changes, it seems like I’ll need to find
a way to redistribute my partitions over SourceReaders, or else I'll end up
with an unbalanced distribution of partitions to SourceReaders.

I looked at the docs on elastic scaling [2], and it seems like when the
parallelism of the source changes, the source will be checkpointed and
restored. I think this would mean all the SourceReaders get restarted, and
their splits are returned to the SplitEnumerator for reassignment. Is this
approximately correct?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/