Hi Chris and others, Yes, you are correct; I looked through KIP-298 to understand it better. I agree with your idea to handle "errors.tolerance=none."
I see, you are basically saying you are in favor of standardizing handling what to set the reporter to if it is not configured. I am on board with this proposal, especially if this is in line with previous behaviors as you mentioned. I will add both of these suggestions to the KIP. Lastly, unless anyone has any issues with Chris's suggestions, I believe the last part we have to come to a consensus is using a Future as the return type. I am for giving extra guarantees to the user if they wish; however, I am not very familiar with the potential issues with the consumer heartbeat as Arjun pointed out. Does anyone have any thoughts on this? Thanks, Aakash On Tue, May 19, 2020 at 2:10 PM Chris Egerton <chr...@confluent.io> wrote: > Hi Aakash, > > > If "errors.tolerance=none", should it not be the case that the error > reporter does not even report any error; rather, the task just fails after > throwing the error? I do understand the point you are saying about > duplicates, though. > > I believe the "errors.tolerance" property dictates whether a task should > fail after a record that causes problems during conversion or > transformation is encountered and reported (for example, by writing to a > DLQ). If it is set to "none", then the task will fail immediately; if it is > set to "all", then the task will continue running normally. So if we want > to preserve that behavior, we might want to immediately throw an exception > when an errant record is reported by a "SinkTask" instance and the user has > configured "errors.tolerance = none", which unless caught will cause the > task to cease writing records to the sink. In addition to throwing that > exception, we should also still fail the task; the exception is just a way > to (hopefully) interrupt the task's processing of records in order to > prevent duplicates if/when the task is restarted later on. > > > Lastly, why do you say we should always provide an errant record > reporter? > Doesn't that change the contract of what functionality it is providing? > > I'm just thinking that instead of returning "null" when no errant record > reporter is configured, we could return one that always fails the task and > throws an exception. This seems in line with the default behavior of the > framework when no error handling configuration properties are specified and > a record causes problems during conversion or transformation. We could > leave the choice in the hands of developers but this might make things > confusing for users who get different behavior from different connectors > under the same circumstances. > > Hope this helps! > > Cheers, > > Chris > > On Tue, May 19, 2020 at 1:50 PM Aakash Shah <as...@confluent.io> wrote: > > > Hi Arjun, > > > > I am not very familiar with how the potential heartbeat failure would > cause > > more failures when consuming subsequent records. Can you elaborate on > this? > > > > Thanks, > > Aakash > > > > On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com> > > wrote: > > > > > One more concern with the connector blocking on the Future's get() is > > that > > > it may cause the task's consumer to fail to heartbeat (since there is > no > > > independent thread to do this). That would then cause failures when we > > > eventually try to consume more records after returning from put(). The > > > developer would need to be cognizant of these bits before waiting on > the > > > future, which adds a reasonable amount of complexity. > > > > > > Even with preCommit() returning incomplete offsets, I suppose the > concern > > > would be that the put() method keeps giving the task more records, and > to > > > truly pause the "firehose", the task needs to pause all partitions? > > > > > > > > > On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com> > > > wrote: > > > > > > > Can we get a couple of examples that shows utility of waiting on the > > > > Future<>? Also, in preCommit() we would report back on the incomplete > > > > offsets. So that feedback mechanism will already exists for > developers > > > who > > > > want to manually manage this. > > > > > > > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com> > > wrote: > > > > > > > >> Thanks, Aakash, for updating the KIP. > > > >> > > > >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish < > arjun.sat...@gmail.com> > > > >> wrote: > > > >> > > > >> > Hi Randall, > > > >> > > > > >> > Thanks for the explanation! Excellent point about guaranteeing > > offsets > > > >> in > > > >> > the async case. > > > >> > > > > >> > If we can guarantee that the offsets will be advanced only after > the > > > bad > > > >> > records are reported, then is there any value is the Future<> > return > > > >> type? > > > >> > I feel we can declare the function with a void return type: > > > >> > > > > >> > void report(SinkRecord failedRecord, Throwable error) > > > >> > > > > >> > that works asynchronously, and advances offsets only after the DLQ > > > >> producer > > > >> > (and other reporters) complete successfully (as you explained). > > > >> > > > > >> > This actually alleviates my concern of what this Future<> actually > > > >> means. > > > >> > Since a failure to report should kill the tasks, there is no > reason > > > for > > > >> the > > > >> > connector to ever wait on the get(). > > > >> > > > >> > > > >> We should not say "there is no reason", because we don't know all of > > the > > > >> requirements that might exist for sending records to external > systems. > > > The > > > >> additional guarantee regarding error records being fully recorded > > before > > > >> `preCommit(...)` is called is a minimal guarantee that Connect > > provides > > > >> the > > > >> sink task, and returning a Future allow a sink task to have > *stronger* > > > >> guarantees than what Connect provides by default. > > > >> > > > >> Once again: > > > >> 1. we need an async API to allow the sink task to report problem > > records > > > >> and then immediately continue doing more work. > > > >> 2. Connect should guarantee to the sink task that all reported > records > > > >> will > > > >> actually be recorded before `preCommit(...)` is called > > > >> 3. a sink task *might* need stronger guarantees, and may need to > block > > > on > > > >> the reported records some time before `preCommit(...)`, and we > should > > > >> allow > > > >> them to do this. > > > >> 4. Future and callbacks are common techniques, but there are > > significant > > > >> runtime risks of using callbacks, whereas Future is a > common/standard > > > >> pattern that is straightforward to use. > > > >> > > > >> This *exactly* matches the current KIP, which is why I plan to vote > > for > > > >> this valuable and well-thought out KIP. > > > >> > > > >> > > > >> > And if we are guaranteeing that the > > > >> > offsets are only advanced when the errors are reported, then this > > > >> becomes a > > > >> > double win: > > > >> > > > > >> > 1. connector developers can literally fire and forget failed > > records. > > > >> > 2. offsets are correctly advanced on errors being reported. > Failure > > to > > > >> > report error will kill the task, and the last committed offset > will > > be > > > >> the > > > >> > correct one. > > > >> > > > >> > > > >> > The main contract would simply be to call report() before > > preCommit() > > > or > > > >> > before put() returns in the task, so the framework knows that that > > > there > > > >> > are error records reported, and those need to finish before the > > > offsets > > > >> can > > > >> > be advanced. > > > >> > > > > >> > I think I'd be pretty excited about this API. and if we all agree, > > > then > > > >> > let's go ahead with this? > > > >> > > > >> > > > >> > Best, > > > >> > > > > >> > > > > >> > > > > >> > > > > > > > > > >