Small correction. I didn't mean to declare the new method `abstract`. I agree with Randall's suggestion to give it a default implementation that will call the old `put` and at the same time deprecate the old `put`.
Konstantine On Fri, May 15, 2020 at 10:19 AM Konstantine Karantasis < konstant...@confluent.io> wrote: > > I was on the fence between the various overloading methods myself, liking > `start(...)` the least. > > Initially, I thought we were interested in offering the ability to call > the reporter out of band, outside `put`. > But after your replies I understand you don't think that's the case, and I > also agree that keeping the reporter in `put(...)` makes the intended use > case more clear. > In most cases it won't even require storing it as a member variable in the > task class. > > So, I'm also happy with > `public abstract void put(Collection<SinkRecord> records, BiFunction<...> > failedRecordReporter)` > > Konstantine > > On Fri, May 15, 2020 at 9:10 AM Randall Hauch <rha...@gmail.com> wrote: > >> Konstantine said: >> >> > I notice Randall also used BiFunction in his example, I wonder if it's >> for >> > similar reasons. >> > >> >> Nope. Just a typo on my part. >> >> There appear to be three outstanding questions. >> >> First, Konstantine suggested calling this "failedRecordReporter". I think >> this is minor, but using this new name may be a bit more precise and I'd >> be >> fine with this. >> >> Second, should the reporter method be synchronous? I think the two options >> are: >> >> 2a. Use `BiConsumer<SinkRecord, Throwable>` that returns nothing and >> blocks >> (at this time). >> 2b. Use `BiFunction<SinkRecord, Throwable, Future<Void>>` that returns a >> future that the user can optionally use to be synchronous. >> >> I do agree with Konstantine that option 2b gives us more room for future >> semantic changes, and since the producer write is already asynchronous >> this >> should be straightforward to implement. I think the concern here is that >> if >> the sink task does not *use* the future to make this synchronous, it is >> very possible that the error records could be written out of order (due to >> retries). But this won't be an issue if the implementation uses >> `max.in.flight.requests.per.connection=1` for writing the error records. >> It's a little less clear, but honestly IMO passing the reporter in the >> `put(...)` method helps make this lambda easier to understand, for some >> strange reason. So unless there are good reasons to avoid this, I'd be in >> favor of 2b and returning a Future. >> >> Third, how do we pass the reporter lambda / method reference to the task? >> My proposal to pass the reporter via an overload `put(...)` still is the >> most attractive to me, for several reasons: >> >> 3a. There's no need to pass the reporter separately *and* to describe the >> changes in method call ordering. >> 3b. As mentioned above, for some reason passing it via `put(...)` makes >> the >> intent more clear that it be used when processing the SinkRecord, and that >> it shouldn't be used in `start(...)`, `preCommit(...)`, >> `onPartitionsAssigned(...)`, or any of the other task methods. As Andrew >> pointed out earlier, *describing* this in the KIP and in JavaDoc will be >> tough to be exact yet succinct. >> 3c. There is already precedence for evolving >> `SourceTask.commitRecord(...)`, and the pattern is identical. >> 3d. Backward compatibility is easy to understand, and at the same time >> it's >> pretty easy to describe what implementations that want to take advantage >> of >> this feature should do. >> 3e. Minimal changes to the interface: we're just *adding* one default >> method that calls the existing method and deprecating the existing >> `put(...)`. >> 3f. Deprecating the existing `put(...)` makes it more clear in a >> programmatic sense that new sink implementations should use the reporter, >> and that we recommend old sinks evolve to use it. >> >> Some of these benefits apply to some of the other suggestions, but I think >> none of the other suggestions have all of these benefits. For example, >> overloading `initialize(...)` is more difficult since most sink connectors >> don't override it and therefore would be less subject to deprecations >> warnings. Overloading `start(...)` is less attractive. Adding a method IMO >> shares the fewest of these benefits. >> >> The one disadvantage of this approach is that sink task implementations >> can't rely upon the reporter upon startup. IMO that's an acceptable >> tradeoff to get the cleaner and more explicit API, especially if the API >> contract is that Connect will pass the same reporter instance to each call >> to `put(...)` on a single task instance. >> >> Best regards, >> >> Randall >> >> On Fri, May 15, 2020 at 6:59 AM Andrew Schofield < >> andrew_schofi...@live.com> >> wrote: >> >> > Hi, >> > Randall's suggestion is really good. I think it gives the flexibility >> > required and also >> > keeps the interface the right way round. >> > >> > Thanks, >> > Andrew Schofield >> > >> > On 15/05/2020, 02:07, "Aakash Shah" <as...@confluent.io> wrote: >> > >> > > Hi Randall, >> > > >> > > Thanks for the feedback. >> > > >> > > 1. This is a great suggestion, but I find that adding an overloaded >> > > put(...) which essentially deprecates the old put(...) to only be used >> > when >> > > a connector is deployed on older versions of Connect adds enough of a >> > > complication that could cause connectors to break if the old put(...) >> > > doesn't correctly invoke the overloaded put(...); either that, or it >> will >> > > add duplication of functionality across the two put(...) methods. I >> think >> > > the older method simplifies things with the idea that a DLQ/error >> > reporter >> > > will or will not be passed into the method depending on the version of >> > AK. >> > > However, I also understand the aesthetic advantage of this method vs >> the >> > > setter method, so I am okay with going in this direction if others >> agree >> > > with adding the overloaded put(...). >> > > >> > > 2. Yes, your assumption is correct. Yes, we can remove the "Order of >> > > Operations" if we go with the overloaded put(...) direction. >> > > >> > > 3. Great point, I will remove them from the KIP. >> > > >> > > 4. Yeah, accept(...) will be synchronous. I will change it to be >> clearer, >> > > thanks. >> > > >> > > 5. This KIP will use existing metrics as well introduce new metrics. I >> > will >> > > update this section to fully specify the metrics. >> > > >> > > Please let me know what you think. >> > > >> > > Thanks, >> > > Aakash >> > > >> > > On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com> >> wrote: >> > > >> > > > Hi, Aakash. >> > > > >> > > > Thanks for the KIP. Connect does need an improved ability for sink >> > > > connectors to report individual records as being problematic, and >> this >> > > > integrates nicely with the existing DLQ feature. >> > > > >> > > > I also appreciate the desire to maintain compatibility so that >> > connectors >> > > > can take advantage of this feature when deployed in a runtime that >> > supports >> > > > this feature, but can safely and easily do without the feature when >> > > > deployed to an older runtime. But I do understand Andrew's concern >> > about >> > > > the aesthetics. Have you considered overloading the `put(...)` >> method >> > and >> > > > adding the `reporter` as a second parameter? Essentially it would >> add >> > the >> > > > one method (with proper JavaDoc) to `SinkTask` only: >> > > > >> > > > ``` >> > > > public void put(Collection<SinkRecord> records, >> > BiFunction<SinkRecord, >> > > > Throwable> reporter) { >> > > > put(records); >> > > > } >> > > > ``` >> > > > and the WorkerSinkTask would be changed to call `put(Collection, >> > > > BiFunction)` instead. >> > > > >> > > > Sink connector implementations that don't do anything different can >> > still >> > > > override `put(Collection)`, and it still works as before. Developers >> > that >> > > > want to change their sink connector implementations to support this >> new >> > > > feature would do the following, which would work in older and newer >> > Connect >> > > > runtimes: >> > > > ``` >> > > > public void put(Collection<SinkRecord> records) { >> > > > put(records, null); >> > > > } >> > > > public void put(Collection<SinkRecord> records, >> > BiFunction<SinkRecord, >> > > > Throwable> reporter) { >> > > > // the normal `put(Collection)` logic goes here, but can >> > optionally >> > > > use `reporter` if non-null >> > > > } >> > > > ``` >> > > > >> > > > I think this has all the same benefits of the current KIP, but >> > > > it's noticeably simpler and hopefully more aesthetically pleasing. >> > > > >> > > > As for Andrew's second concern about "the task can send errant >> records >> > to >> > > > it within put(...)" being too restrictive. My guess is that this was >> > more >> > > > an attempt at describing the basic behavior, and less about >> requiring >> > the >> > > > reporter only being called within the `put(...)` method and not by >> > methods >> > > > to which `put(...)` synchronously or asynchronously delegates. Can >> you >> > > > confirm whether my assumption is correct? If so, then perhaps my >> > suggestion >> > > > helps work around this issue as well, since there would be no >> > restriction >> > > > on when the reporter is called, and the whole "Order of Operations" >> > section >> > > > could potentially be removed. >> > > > >> > > > Third, it's not clear to me why the "Error Reporter Object" >> subsection >> > in >> > > > the "Proposal" section lists the worker configuration properties >> that >> > were >> > > > previously introduced with >> > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect >> > > > . >> > > > Maybe it's worth mentioning that the error reporter functionality >> will >> > > > reuse or build upon KIP-298, including reusing the configuration >> > properties >> > > > defined in KIP-298. But IIUC, the KIP does not propose changing any >> > > > technical or semantic aspect of these configuration properties, and >> > > > therefore the KIP would be more clear and succinct without them. >> > *That* the >> > > > error reporter will use these properties is part of the UX and >> > therefore >> > > > necessary to mention, but *how* it uses those properties is really >> up >> > to >> > > > the implementation. >> > > > >> > > > Fourth, the "Synchrony" section has a sentence that is confusing, or >> > not as >> > > > clear as it could be. >> > > > >> > > > "If a record is sent to the error reporter, processing of the >> next >> > > > errant record in accept(...) will not begin until the producer >> > successfully >> > > > sends the errant record to Kafka." >> > > > >> > > > This sentence is a bit difficult to understand, but IIUC this really >> > just >> > > > means that "accept(...)" will be synchronous and will block until >> the >> > > > errant record has been successfully written to Kafka. If so, let's >> say >> > > > that. The rest of the paragraph is fine. >> > > > >> > > > Finally, is this KIP proposing new metrics, or that existing metrics >> > would >> > > > be used to track the error reporter usage? If the former, then >> please >> > > > fully-specify what these metrics will be, similarly to how metrics >> are >> > > > specified in >> > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework >> > > > . >> > > > >> > > > Thoughts? >> > > > >> > > > Best regards, >> > > > >> > > > Randall >> > > > >> > > > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield < >> > > > andrew_schofi...@live.com> >> > > > wrote: >> > > > >> > > > > Hi Aakash, >> > > > > Thanks for sorting out the replies to the mailing list. >> > > > > >> > > > > First, I do like the idea of improving error reporting in sink >> > > > connectors. >> > > > > I'd like a simple >> > > > > way to put bad records onto the DLQ. >> > > > > >> > > > > I think this KIP is considerably more complicated than it seems. >> The >> > > > > guidance on the >> > > > > SinkTask.put() method is that it should send the records >> > asynchronously >> > > > > and immediately >> > > > > return, so the task is likely to want to report errors >> asynchronously >> > > > > too. Currently the KIP >> > > > > states that "the task can send errant records to it within >> put(...)" >> > and >> > > > > that's too restrictive. >> > > > > The task ought to be able to report any unflushed records, but the >> > > > > synchronisation of this is going >> > > > > to be tricky. I suppose the connector author needs to make sure >> that >> > all >> > > > > errant records have >> > > > > been reported before returning control from SinkTask.flush(...) or >> > > > perhaps >> > > > > SinkTask.preCommit(...). >> > > > > >> > > > > I think the interface is a little strange too. I can see that this >> > was >> > > > > done so it's possible to deliver a connector >> > > > > that supports error reporting but it can also work in earlier >> > versions of >> > > > > the KC runtime. But, the >> > > > > pattern so far is that the task uses the methods of >> SinkTaskContext >> > to >> > > > > access utilities in the Kafka >> > > > > Connect runtime, and I suggest that reporting a bad record is >> such a >> > > > > utility. SinkTaskContext has >> > > > > changed before when the configs() methods was added, so I think >> > there is >> > > > > precedent for adding a method. >> > > > > The way the KIP adds a method to SinkTask that the KC runtime >> calls >> > to >> > > > > provide the error reporting utility >> > > > > seems not to match what has gone before. >> > > > > >> > > > > Thanks, >> > > > > Andrew >> > > > > >> > > > > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote: >> > > > > >> > > > > I wasn't previously added to the dev mailing list, so I'd >> like to >> > > > post >> > > > > my >> > > > > discussion with Andrew Schofield below for visibility and >> further >> > > > > discussion: >> > > > > >> > > > > Hi Andrew, >> > > > > >> > > > > Thanks for the reply. The main concern with this approach >> would >> > be >> > > > its >> > > > > backward compatibility. I’ve highlighted the thoughts around >> the >> > > > > backwards >> > > > > compatibility of the initial approach, please let me know what >> > you >> > > > > think. >> > > > > >> > > > > Thanks, >> > > > > Aakash >> > > > > >> > > > > >> > > > > >> > > > >> > >> ____________________________________________________________________________________________________________________________ >> > > > > >> > > > > Hi, >> > > > > By adding a new method to the SinkContext interface in say >> Kafka >> > > > 2.6, a >> > > > > connector that calls it would require a Kafka 2.6 connect >> > runtime. I >> > > > > don't >> > > > > quite see how that's a backward compatibility problem. It's >> just >> > that >> > > > > new >> > > > > connectors need the latest interface. I might not quite be >> > > > > understanding, >> > > > > but I think it would be fine. >> > > > > >> > > > > Thanks, >> > > > > Andrew >> > > > > >> > > > > >> > > > > >> > > > >> > >> ____________________________________________________________________________________________________________________________ >> > > > > >> > > > > Hi Andrew, >> > > > > >> > > > > I apologize for the way the reply was sent. I just subscribed >> to >> > the >> > > > > dev >> > > > > mailing list so it should be resolved now. >> > > > > >> > > > > You are correct, new connectors would simply require the >> latest >> > > > > interface. >> > > > > However, we want to remove that requirement - in other words, >> we >> > want >> > > > > to >> > > > > allow the possibility that someone wants the latest >> connector/to >> > > > > upgrade to >> > > > > the latest version, but deploys it on an older version of AK. >> > > > > Basically, we >> > > > > don't want to enforce the necessity of upgrading AK to get the >> > latest >> > > > > interface. In the current approach, there would be no issue of >> > > > > deploying a >> > > > > new connector on an older version of AK, as the Connect >> framework >> > > > would >> > > > > simply not invoke the new method. >> > > > > >> > > > > Please let me know what you think and if I need to clarify >> > anything. >> > > > > >> > > > > Thanks, >> > > > > Aakash >> > > > > >> > > > > >> > > > >> > > >> > >> > >> >