Hi Aakash,

Responding to your question from a separate KIP-610 discussion thread here,
with my comments:

> 4. Great point about the addition of the extra configuration properties.
By
"If we decide to include these properties, we should also update the
"Synchrony" section to be agnostic about what the error reporter is doing
under the hood since there won't necessarily be a Kafka producer involved
in handling records given to the error reporter," are you referring to the
fact that if people only choose to enable logging and not sending to a DLQ,
there won't necessarily be a producer involved?

Yep, exactly :)

Cheers,

Chris

On Thu, May 14, 2020 at 4:34 PM Christopher Egerton <chr...@confluent.io>
wrote:

> Hi Randall,
>
> I think I prefer the method originally specified in the KIP. A separate
> method can come with a contract about if/when it's called so that tasks can
> assume that it's only invoked once over their lifetime, and allows
> connector developers to separate the logic for storing (and possibly doing
> other things with) the errant record reporter from processing records,
> which are generally accomplished differently. It also seems strange to
> repeatedly pass in the same argument to that method; there's never going to
> be a case where the framework might want to change what it passes to the
> task for the errant record reporter, is there? Oh, and connector developers
> would still have to manually implement the now-deprecated variant of
> "put(Collection<SinkRecord> records)" to call something like "put(records,
> null)", or there would be compatibility risks on older workers if they only
> implemented the non-deprecated method.
>
> That said, if others like this idea I think it works well enough to not
> block the rest of the KIP.
>
> Cheers,
>
> Chris
>
> On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com> wrote:
>
>> Hi, Aakash.
>>
>> Thanks for the KIP. Connect does need an improved ability for sink
>> connectors to report individual records as being problematic, and this
>> integrates nicely with the existing DLQ feature.
>>
>> I also appreciate the desire to maintain compatibility so that connectors
>> can take advantage of this feature when deployed in a runtime that
>> supports
>> this feature, but can safely and easily do without the feature when
>> deployed to an older runtime. But I do understand Andrew's concern about
>> the aesthetics. Have you considered overloading the `put(...)` method and
>> adding the `reporter` as a second parameter? Essentially it would add the
>> one method (with proper JavaDoc) to `SinkTask` only:
>>
>> ```
>>     public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
>> Throwable> reporter) {
>>         put(records);
>>     }
>> ```
>> and the WorkerSinkTask would be changed to call `put(Collection,
>> BiFunction)` instead.
>>
>> Sink connector implementations that don't do anything different can still
>> override `put(Collection)`, and it still works as before. Developers that
>> want to change their sink connector implementations to support this new
>> feature would do the following, which would work in older and newer
>> Connect
>> runtimes:
>> ```
>>     public void put(Collection<SinkRecord> records) {
>>         put(records, null);
>>     }
>>     public void put(Collection<SinkRecord> records, BiFunction<SinkRecord,
>> Throwable> reporter) {
>>         // the normal `put(Collection)` logic goes here, but can
>> optionally
>> use `reporter` if non-null
>>     }
>> ```
>>
>> I think this has all the same benefits of the current KIP, but
>> it's noticeably simpler and hopefully more aesthetically pleasing.
>>
>> As for Andrew's second concern about "the task can send errant records to
>> it within put(...)" being too restrictive. My guess is that this was more
>> an attempt at describing the basic behavior, and less about requiring the
>> reporter only being called within the `put(...)` method and not by methods
>> to which `put(...)` synchronously or asynchronously delegates. Can you
>> confirm whether my assumption is correct? If so, then perhaps my
>> suggestion
>> helps work around this issue as well, since there would be no restriction
>> on when the reporter is called, and the whole "Order of Operations"
>> section
>> could potentially be removed.
>>
>> Third, it's not clear to me why the "Error Reporter Object" subsection in
>> the "Proposal" section lists the worker configuration properties that were
>> previously introduced with
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
>> .
>> Maybe it's worth mentioning that the error reporter functionality will
>> reuse or build upon KIP-298, including reusing the configuration
>> properties
>> defined in KIP-298. But IIUC, the KIP does not propose changing any
>> technical or semantic aspect of these configuration properties, and
>> therefore the KIP would be more clear and succinct without them. *That*
>> the
>> error reporter will use these properties is part of the UX and therefore
>> necessary to mention, but *how* it uses those properties is really up to
>> the implementation.
>>
>> Fourth, the "Synchrony" section has a sentence that is confusing, or not
>> as
>> clear as it could be.
>>
>>     "If a record is sent to the error reporter, processing of the next
>> errant record in accept(...) will not begin until the producer
>> successfully
>> sends the errant record to Kafka."
>>
>> This sentence is a bit difficult to understand, but IIUC this really just
>> means that "accept(...)" will be synchronous and will block until the
>> errant record has been successfully written to Kafka. If so, let's say
>> that. The rest of the paragraph is fine.
>>
>> Finally, is this KIP proposing new metrics, or that existing metrics would
>> be used to track the error reporter usage? If the former, then please
>> fully-specify what these metrics will be, similarly to how metrics are
>> specified in
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
>> .
>>
>> Thoughts?
>>
>> Best regards,
>>
>> Randall
>>
>> On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
>> andrew_schofi...@live.com>
>> wrote:
>>
>> > Hi Aakash,
>> > Thanks for sorting out the replies to the mailing list.
>> >
>> > First, I do like the idea of improving error reporting in sink
>> connectors.
>> > I'd like a simple
>> > way to put bad records onto the DLQ.
>> >
>> > I think this KIP is considerably more complicated than it seems. The
>> > guidance on the
>> > SinkTask.put() method is that it should send the records asynchronously
>> > and immediately
>> > return, so the task is likely to want to report errors asynchronously
>> > too.  Currently the KIP
>> > states that "the task can send errant records to it within put(...)" and
>> > that's too restrictive.
>> > The task ought to be able to report any unflushed records, but the
>> > synchronisation of this is going
>> > to be tricky. I suppose the connector author needs to make sure that all
>> > errant records have
>> > been reported before returning control from SinkTask.flush(...) or
>> perhaps
>> > SinkTask.preCommit(...).
>> >
>> > I think the interface is a little strange too. I can see that this was
>> > done so it's possible to deliver a connector
>> > that supports error reporting but it can also work in earlier versions
>> of
>> > the KC runtime. But, the
>> > pattern so far is that the task uses the methods of SinkTaskContext to
>> > access utilities in the Kafka
>> > Connect runtime, and I suggest that reporting a bad record is such a
>> > utility. SinkTaskContext has
>> > changed before when the configs() methods was added, so I think there is
>> > precedent for adding a method.
>> > The way the KIP adds a method to SinkTask that the KC runtime calls to
>> > provide the error reporting utility
>> > seems not to match what has gone before.
>> >
>> > Thanks,
>> > Andrew
>> >
>> > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
>> >
>> >     I wasn't previously added to the dev mailing list, so I'd like to
>> post
>> > my
>> >     discussion with Andrew Schofield below for visibility and further
>> >     discussion:
>> >
>> >     Hi Andrew,
>> >
>> >     Thanks for the reply. The main concern with this approach would be
>> its
>> >     backward compatibility. I’ve highlighted the thoughts around the
>> > backwards
>> >     compatibility of the initial approach, please let me know what you
>> > think.
>> >
>> >     Thanks,
>> >     Aakash
>> >
>> >
>> >
>> ____________________________________________________________________________________________________________________________
>> >
>> >     Hi,
>> >     By adding a new method to the SinkContext interface in say Kafka
>> 2.6, a
>> >     connector that calls it would require a Kafka 2.6 connect runtime. I
>> > don't
>> >     quite see how that's a backward compatibility problem. It's just
>> that
>> > new
>> >     connectors need the latest interface. I might not quite be
>> > understanding,
>> >     but I think it would be fine.
>> >
>> >     Thanks,
>> >     Andrew
>> >
>> >
>> >
>> ____________________________________________________________________________________________________________________________
>> >
>> >     Hi Andrew,
>> >
>> >     I apologize for the way the reply was sent. I just subscribed to the
>> > dev
>> >     mailing list so it should be resolved now.
>> >
>> >     You are correct, new connectors would simply require the latest
>> > interface.
>> >     However, we want to remove that requirement - in other words, we
>> want
>> > to
>> >     allow the possibility that someone wants the latest connector/to
>> > upgrade to
>> >     the latest version, but deploys it on an older version of AK.
>> > Basically, we
>> >     don't want to enforce the necessity of upgrading AK to get the
>> latest
>> >     interface. In the current approach, there would be no issue of
>> > deploying a
>> >     new connector on an older version of AK, as the Connect framework
>> would
>> >     simply not invoke the new method.
>> >
>> >     Please let me know what you think and if I need to clarify anything.
>> >
>> >     Thanks,
>> >     Aakash
>> >
>> >
>>
>

Reply via email to