Apologies for the late entry -- I entirely missed this KIP and discussion. :-(
Thanks for creating the KIP and proposing this change. I do think it's useful for source connector tasks to get more information about the acknowledgement after the record was written. However, given the KIPs suggestion that the two `commitRecord(...)` method variants are disjoint, I'm a bit surprised that the WorkerSourceTask would do the following: task.commitRecord(preTransformRecord); if (recordMetadata != null) task.commitRecord(preTransformRecord, recordMetadata); rather than: if (recordMetadata != null) task.commitRecord(preTransformRecord, recordMetadata); else task.commitRecord(preTransformRecord); But if this is the case, I would argue that it is better to simply have one `commitRecord(SourceRecord record, RecordMetadata metadata)` method that clearly denotes that the metadata may be null if the record was not written (e.g., an SMT caused it to be dropped) or could not be written (after giving up retrying after failures in the SMTs and/or the converter), and let the implementation deal with the differences. Essentially, we've be deprecating the existing `commitRecord(SourceRecord)` method, changing the framework to always use the new method, and having the new method by default delegate to the existing method. (This is what Jun also suggested on the PR request, https://github.com/apache/kafka/pull/6295#discussion_r330097541). This is backwards compatible for connector implementations that only override the old method, yet provides a way for connectors that do implement the new API to override the new method without having to also implement the old method, too. IOW: @deprecated public void commitRecord(SourceRecord sourceRecord) { // nop } /** * <p> * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation and not sent to the producer. * By default, this method delegates to the {@link #commitRecord(SourceRecord)} method to maintain backward compatibility. Tasks can choose to override this method, * override the {@link #commitRecord(SourceRecord)} method, or not override either one. * </p> * <p> * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets * automatically. This hook is provided for systems that also need to store offsets internally * in their own system. * </p> * * @param record {@link SourceRecord} that was successfully sent via the producer. * @param recordMetadata the metadata from the producer's write acknowledgement, or null if the record was not sent to the producer because it was filtered by an SMT or could not be transformed and/or converted * @throws InterruptedException */ public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) { commitRecord(sourceRecord); } Best regards, Randall On Thu, Jan 31, 2019 at 9:02 AM Ryanne Dolan <ryannedo...@gmail.com> wrote: > Andrew, I have considered this, but I think passing null for RecordMetadata > would be surprising and error prone for anyone implementing SourceTask. I > figure the only use-case for overriding this variant (and not the existing > one) is to capture the RecordMetadata. If that's the case, every > implementation would need to check for null. What worries me is that an > implementation that does not check for null will seem to work until an SMT > is configured to filter records, which I believe would be exceedingly rare. > Moreover, the presence of the RecordMetadata parameter strongly implies > that the record has been sent and ACK'd, and it would be surprising to > discover otherwise. > > On the other hand, the current PR makes it difficult to distinguish between > records that are filtered vs ACK'd. The implementing class would need to > correlate across poll() and the two commitRecord() invocations in order to > find records that were poll()'d but not ACK'd. In contrast, if we passed > null to commitRecord, the method would trivially know that the record was > filtered. I think this is probably not a common use-case, so I don't think > we should worry about it. In fact, the existing commitRecord callback seems > to purposefully hide this detail from the implementing class, and I don't > know why we'd try to expose it in the new method. > > This sort of confusion is why I originally proposed a new method name for > this callback, as does the similar KIP-381. I agree that overloading the > existing method is all-around easier, and I think a casual reader would > make the correct assumption that RecordMetadata in the parameter list > implies that the record was sent and ACK'd. > > > the connector implementor would want to provide only a single variant of > commitRecord() > > I think this would be true either way. The only reason you'd implement both > variants is to detect that a record has _not_ been ACK'd, which again I > believe is a non-requirement. > > Would love to hear if you disagree. > > Thanks! > Ryanne > > > On Thu, Jan 31, 2019 at 3:47 AM Andrew Schofield < > andrew_schofi...@live.com> > wrote: > > > As you might expect, I like the overloaded commitRecord() but I think the > > overloaded method should be called in exactly the same situations as the > > previous method. When it does not reflect an ACK, the second parameter > > could be null. The text of the KIP says that the overloaded method is > only > > called when a record is ACKed and I would have thought that the connector > > implementor would want to provide only a single variant of > commitRecord(). > > > > Andrew Schofield > > IBM Event Streams > > > > On 31/01/2019, 03:00, "Ryanne Dolan" <ryannedo...@gmail.com> wrote: > > > > I've updated the KIP and PR to overload commitRecord instead of > adding > > a > > new method. Here's the PR: > > > > > > > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6171&data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&sdata=hxBWSTt5gF7AAVxw2P8%2BZ8duBB0T97gHOOYG6GCkdd8%3D&reserved=0 > > > > Ryanne > > > > On Mon, Jan 21, 2019 at 6:29 PM Ryanne Dolan <ryannedo...@gmail.com> > > wrote: > > > > > Andrew Schofield suggested we overload the commitRecord method > > instead of > > > adding a new one. Thoughts? > > > > > > Ryanne > > > > > > On Thu, Jan 17, 2019, 5:34 PM Ryanne Dolan <ryannedo...@gmail.com > > wrote: > > > > > >> I had to change the KIP number (concurrency is hard!) so the link > > is now: > > >> > > >> > > >> > > > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-416%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&sdata=VkAFrM8B2ozCRJosPQjgM3aDD1cS%2Bob8KWVuNuuOJ9s%3D&reserved=0 > > >> > > >> Ryanne > > >> > > >> On Fri, Jan 11, 2019 at 2:43 PM Ryanne Dolan < > ryannedo...@gmail.com > > > > > >> wrote: > > >> > > >>> Hey y'all, > > >>> > > >>> Please review the following small KIP: > > >>> > > >>> > > >>> > > > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-414%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151945855&sdata=2mhXA4hEV3ZvrFaOcTqagO1rYNj1JsYAEDHQsFqkzG8%3D&reserved=0 > > >>> > > >>> Thanks! > > >>> Ryanne > > >>> > > >> > > > > > > >