Hi Evan,

1. I'd recommend supporting DeserializationSchema in any case similar to
KafkaRecordDeserializationSchema.
First, it aligns with other sources and user expectations.
Second, it's a tad faster and the plan looks easier if you omit a chained
task.
Third, you can avoid quite a bit of boilerplate code on user side by having
adapters such that a user can use any existing Flink DeserializationSchema
to deserialize the payload; so without writing any UDF in 80% of the use
cases, the user gets the value that he wants (see
KafkaValueOnlyDeserializationSchemaWrapper).
Lastly, we also plan to have first class support for invalid record
handling at some point and it might be connected to DeserializationSchema.

2. It's any reassignment while there is still data flowing in the execution
graph. It's always a matter if there are parallel roads from source to
sink. As long as there is an old record on the road, sending new records on
a different road has always the potential of new record overtaking old
record.
If you could drain all data (currently not possible) without restarting,
then dynamic reassignment would be safe.

Note that without backpressure, it would certainly be enough to wait a
couple of seconds after unassigning a partition before reassigning it to
avoid any reordering issue. Maybe you could offer a configuration option
and the user has to take some responsibility.

I could also see that we could piggyback on aligned checkpoint barriers to
not emit any data until the checkpoint has been completed and do the
reassignment then. But that's certainly something that the framework should
support and that you don't want to implement on your own.

3. Yes if you throw an IOException (or any other exception), the checkpoint
would not complete and the task gets restarted (could be in an inconsistent
state).

On Tue, Jun 8, 2021 at 10:51 PM Evan Palmer <palm...@google.com> wrote:

> Hello again,
>
> Thank you for all of your help so far, I have a few more questions if you
> have the time :)
>
> 1. Deserialization Schema
>
> There's been some debate within my team about whether we should offer a
> DeserializationSchema and SerializationSchema in our source and sink.
>
> If we include don't include the schemas, our source and sink would be
> implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>,
> which is the type our client library currently returns (this type is
> serializable), and users could transform the messages in a map function
> after the source. This would make implementing the source somewhat easier,
> and it doesn't seem like it would be much more difficult for users. On the
> other hand, I looked around and didn't find any flink sources implemented
> without a deserialization/serialization schema, so I'm worried that this
> choice might make our source/sink confusing for users, or that we're
> missing something. What are your thoughts on this?
>
> 2. Order aware rebalancing.
>
> I want to make sure I understand the problem with rebalancing partitions
> to different SourceReaders. Does any reassignment of a pub/sub
> partition between SourceReaders have the potential to cause disorder, or
> can order be guaranteed by some variant of ensuring that the partition is
> assigned to only one source reader at a time?
>
> I read through
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows,
> which made me think that if the user wanted a pipeline like
>
> env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)
>
> Then if two different source tasks had messages from a single pub/sub
> partition, there could be disorder. We're not planning to implement any
> rebalancing of partitions in our source, but I wanted to make sure I can
> document this correctly :)
>
> 3. Reporting permanent failures in the Sink
>
> Is it sufficient to throw an exception from Committer.commit() in the case
> where our sink has permanently failed in some way (e.g. the configured
> topic has been deleted, or the user doesn't have permissions to publish),
> or is there something else we should be doing?
>
> Evan
>
>
> On Mon, May 10, 2021 at 9:57 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Evan,
>>
>> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>>> where this connector should live. I saw that there's already a pubsub
>>> connector in
>>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>>> so if flink is willing to host it, perhaps it could live near there?
>>> Alternatively, it could live alongside our client library in
>>> https://github.com/googleapis/java-pubsublite.
>>>
>>
>> For a long time, the community has been thinking of moving (most)
>> connectors out of the repository. Especially now with the new source/sink
>> interface, the need to decouple Flink release cycle and connector release
>> cycle is bigger than ever as we do not backport features in our bugfix
>> branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many
>> users would need to wait up to a year to effectively use the source
>> (adaption of new Flink versions is usually slow).
>> Therefore, I'd definitely encourage you to have the connector along your
>> client library - where the release cycles probably also much better align.
>> I will soon present an idea on how to list all available connectors on
>> Flink's connector page such that from a user's perspective, it wouldn't
>> matter if it's internal and external. If it turns out that the community
>> rather wants to have all connectors still in the main repo, we can look at
>> contributing it at a later point in time.
>>
>
> Okay, thanks for the context! We will host the connector in our repository.
>
>>
>> I read through SourceReaderBase and
>>> SingleThreadMultiplexSourceReaderBase. It seems like these base
>>> implementations are mostly designed to help in cases where the client
>>> library uses a synchronous pull based approach. Our client library is async
>>> - we use a bidirectional stream to pull messages from our brokers and we
>>> have some flow control settings to limit the number of bytes and messages
>>> outstanding to the client. I'm wondering if because of this, we should just
>>> implement the SourceReader interface. In particular, we have a per
>>> partition subscriber class which buffers messages up to the flow control
>>> limit and exposes an API almost identical to SourceReader's pollNext and
>>> IsAvailable. What do you think?
>>>
>>
>> Good catch. Yes, the implementation is more or less simulating the async
>> fetching that your library apparently offers already. So feel free to skip
>> it. Of course, if it turns out that you still need certain building blocks,
>> such as record handover, we can also discuss pulling up a common base class
>> to the async sources and the
>>
> SingleThreadMultiplexSourceReaderBase.
>>
>>> Ah, okay, this helped a lot. I'd missed that rebalancing dynamically
>>> would break ordering guarantees, so when I read through the Kafka source, I
>>> was really confused by the lack of rebalancing.
>>
>>
>> We have some ideas on how to make it more dynamic but they are very far
>> down the road and we can hopefully implement them in a transparent way to
>> the sources.
>>
>> On Fri, May 7, 2021 at 11:23 PM Evan Palmer <palm...@google.com> wrote:
>>
>>> Hi Arvid, thank you so much for the detailed reply!
>>>
>>> A few replies / questions inline. Somewhat relatedly, I'm also wondering
>>> where this connector should live. I saw that there's already a pubsub
>>> connector in
>>> https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
>>> so if flink is willing to host it, perhaps it could live near there?
>>> Alternatively, it could live alongside our client library in
>>> https://github.com/googleapis/java-pubsublite.
>>>
>>> On Mon, May 3, 2021 at 1:54 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Evan,
>>>>
>>>> 1) You are absolutely correct that we would urge users to add new
>>>> sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in
>>>> both cases.
>>>> For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource
>>>> [2] as a starting point. Especially basing the reader implementation on
>>>> SingleThreadMultiplexSourceReaderBase will give you some performance boost
>>>> over naive implementations.
>>>> It is probably initially overwhelming but there is lots of thought
>>>> behind the Source interface. We plan on having better documentation and
>>>> more examples in the next months to ease the ramp up but it's also kind of
>>>> a hen-egg problem.
>>>>
>>>
>>> Okay, great, the Source interface seems much easier to work with. I
>>> haven't gotten around to thinking about our Sink yet, but I'm sure I'll
>>> have some questions when I do :)
>>>
>>> I read through SourceReaderBase and
>>> SingleThreadMultiplexSourceReaderBase. It seems like these base
>>> implementations are mostly designed to help in cases where the client
>>> library uses a synchronous pull based approach. Our client library is async
>>> - we use a bidirectional stream to pull messages from our brokers and we
>>> have some flow control settings to limit the number of bytes and messages
>>> outstanding to the client. I'm wondering if because of this, we should just
>>> implement the SourceReader interface. In particular, we have a per
>>> partition subscriber class which buffers messages up to the flow control
>>> limit and exposes an API almost identical to SourceReader's pollNext and
>>> IsAvailable. What do you think?
>>>
>>>>
>>>> I can also provide guidance outside of the ML if it's easier.
>>>>
>>>> 2) You are right, the currentParallelism is static in respect to the
>>>> creation of the SourceReaders. Any change to the parallelism would also
>>>> cause a recreation of the readers.
>>>> Splits are usually checkpointed alongside the readers. On recovery, the
>>>> readers are restored with their old splits. Only when splits cannot be
>>>> recovered in the context of a reader (for example downscaling), the splits
>>>> would be re-added to the enumerator.
>>>>
>>>
>>>> Rebalancing can happen in SplitEnumerator#addReader or
>>>> #handleSplitRequest. The Kafka and File source use even different
>>>> approaches with eager and lazy initialization respectively. Further, you
>>>> can send arbitrary events between the enumerator and readers to work out
>>>> the rebalancing. In theory, you can also dynamically rebalance splits,
>>>> however, you lose ordering guarantees of the messages at the moment (if you
>>>> have records r1, r2 in this order in split s and you reassign s, then you
>>>> may end up with r2, r1 in the sink).
>>>>
>>>
>>> Ah, okay, this helped a lot. I'd missed that rebalancing dynamically
>>> would break ordering guarantees, so when I read through the Kafka source, I
>>> was really confused by the lack of rebalancing.
>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
>>>> [2]
>>>> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99
>>>>
>>>> On Mon, May 3, 2021 at 1:40 AM Evan Palmer <palm...@google.com> wrote:
>>>>
>>>>> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
>>>>> which is a partition based Pub/Sub product, and I have a few questions.
>>>>>
>>>>> 1.
>>>>>
>>>>> I saw that there are two sets of interfaces used in existing sources:
>>>>> The RichSourceFunction, and the set of interfaces from FLIP-27. It seems
>>>>> like the Source interfaces are preferred for new sources, but I wanted to
>>>>> be sure.
>>>>>
>>>>> 2.
>>>>>
>>>>> I’m having a little bit of trouble working out how when the
>>>>> currentParallelism returned by the SplitEnumeratorContext [1] can change,
>>>>> and how a source should react to that.
>>>>>
>>>>> For context, I’m currently thinking about single partitions as
>>>>> “splits”, so a source would have an approximately constant number of 
>>>>> splits
>>>>> which each has an potentially unbounded amount of work (at least in
>>>>> continuous mode). Each split will be assigned to some SourceReader by the
>>>>> split enumerator. If the value of currentParallelism changes, it seems 
>>>>> like
>>>>> I’ll need to find a way to redistribute my partitions over SourceReaders,
>>>>> or else I'll end up with an unbalanced distribution of partitions to
>>>>> SourceReaders.
>>>>>
>>>>> I looked at the docs on elastic scaling [2], and it seems like when
>>>>> the parallelism of the source changes, the source will be checkpointed and
>>>>> restored. I think this would mean all the SourceReaders get restarted, and
>>>>> their splits are returned to the SplitEnumerator for reassignment. Is this
>>>>> approximately correct?
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--
>>>>>
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/
>>>>>
>>>>>

Reply via email to