Hi Aakash, > If "errors.tolerance=none", should it not be the case that the error reporter does not even report any error; rather, the task just fails after throwing the error? I do understand the point you are saying about duplicates, though.
I believe the "errors.tolerance" property dictates whether a task should fail after a record that causes problems during conversion or transformation is encountered and reported (for example, by writing to a DLQ). If it is set to "none", then the task will fail immediately; if it is set to "all", then the task will continue running normally. So if we want to preserve that behavior, we might want to immediately throw an exception when an errant record is reported by a "SinkTask" instance and the user has configured "errors.tolerance = none", which unless caught will cause the task to cease writing records to the sink. In addition to throwing that exception, we should also still fail the task; the exception is just a way to (hopefully) interrupt the task's processing of records in order to prevent duplicates if/when the task is restarted later on. > Lastly, why do you say we should always provide an errant record reporter? Doesn't that change the contract of what functionality it is providing? I'm just thinking that instead of returning "null" when no errant record reporter is configured, we could return one that always fails the task and throws an exception. This seems in line with the default behavior of the framework when no error handling configuration properties are specified and a record causes problems during conversion or transformation. We could leave the choice in the hands of developers but this might make things confusing for users who get different behavior from different connectors under the same circumstances. Hope this helps! Cheers, Chris On Tue, May 19, 2020 at 1:50 PM Aakash Shah <as...@confluent.io> wrote: > Hi Arjun, > > I am not very familiar with how the potential heartbeat failure would cause > more failures when consuming subsequent records. Can you elaborate on this? > > Thanks, > Aakash > > On Tue, May 19, 2020 at 10:03 AM Arjun Satish <arjun.sat...@gmail.com> > wrote: > > > One more concern with the connector blocking on the Future's get() is > that > > it may cause the task's consumer to fail to heartbeat (since there is no > > independent thread to do this). That would then cause failures when we > > eventually try to consume more records after returning from put(). The > > developer would need to be cognizant of these bits before waiting on the > > future, which adds a reasonable amount of complexity. > > > > Even with preCommit() returning incomplete offsets, I suppose the concern > > would be that the put() method keeps giving the task more records, and to > > truly pause the "firehose", the task needs to pause all partitions? > > > > > > On Tue, May 19, 2020 at 9:26 AM Arjun Satish <arjun.sat...@gmail.com> > > wrote: > > > > > Can we get a couple of examples that shows utility of waiting on the > > > Future<>? Also, in preCommit() we would report back on the incomplete > > > offsets. So that feedback mechanism will already exists for developers > > who > > > want to manually manage this. > > > > > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch <rha...@gmail.com> > wrote: > > > > > >> Thanks, Aakash, for updating the KIP. > > >> > > >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish <arjun.sat...@gmail.com> > > >> wrote: > > >> > > >> > Hi Randall, > > >> > > > >> > Thanks for the explanation! Excellent point about guaranteeing > offsets > > >> in > > >> > the async case. > > >> > > > >> > If we can guarantee that the offsets will be advanced only after the > > bad > > >> > records are reported, then is there any value is the Future<> return > > >> type? > > >> > I feel we can declare the function with a void return type: > > >> > > > >> > void report(SinkRecord failedRecord, Throwable error) > > >> > > > >> > that works asynchronously, and advances offsets only after the DLQ > > >> producer > > >> > (and other reporters) complete successfully (as you explained). > > >> > > > >> > This actually alleviates my concern of what this Future<> actually > > >> means. > > >> > Since a failure to report should kill the tasks, there is no reason > > for > > >> the > > >> > connector to ever wait on the get(). > > >> > > >> > > >> We should not say "there is no reason", because we don't know all of > the > > >> requirements that might exist for sending records to external systems. > > The > > >> additional guarantee regarding error records being fully recorded > before > > >> `preCommit(...)` is called is a minimal guarantee that Connect > provides > > >> the > > >> sink task, and returning a Future allow a sink task to have *stronger* > > >> guarantees than what Connect provides by default. > > >> > > >> Once again: > > >> 1. we need an async API to allow the sink task to report problem > records > > >> and then immediately continue doing more work. > > >> 2. Connect should guarantee to the sink task that all reported records > > >> will > > >> actually be recorded before `preCommit(...)` is called > > >> 3. a sink task *might* need stronger guarantees, and may need to block > > on > > >> the reported records some time before `preCommit(...)`, and we should > > >> allow > > >> them to do this. > > >> 4. Future and callbacks are common techniques, but there are > significant > > >> runtime risks of using callbacks, whereas Future is a common/standard > > >> pattern that is straightforward to use. > > >> > > >> This *exactly* matches the current KIP, which is why I plan to vote > for > > >> this valuable and well-thought out KIP. > > >> > > >> > > >> > And if we are guaranteeing that the > > >> > offsets are only advanced when the errors are reported, then this > > >> becomes a > > >> > double win: > > >> > > > >> > 1. connector developers can literally fire and forget failed > records. > > >> > 2. offsets are correctly advanced on errors being reported. Failure > to > > >> > report error will kill the task, and the last committed offset will > be > > >> the > > >> > correct one. > > >> > > >> > > >> > The main contract would simply be to call report() before > preCommit() > > or > > >> > before put() returns in the task, so the framework knows that that > > there > > >> > are error records reported, and those need to finish before the > > offsets > > >> can > > >> > be advanced. > > >> > > > >> > I think I'd be pretty excited about this API. and if we all agree, > > then > > >> > let's go ahead with this? > > >> > > >> > > >> > Best, > > >> > > > >> > > > >> > > > >> > > > > > >