[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644987#comment-16644987 ] Per Steffensen commented on KAFKA-5716: --- Created KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480763#comment-16480763 ] Per Steffensen commented on KAFKA-5716: --- {quote}First of all, if we're going to change the public API we need to have a KIP {quote} Ok, I will open a KIP when I get to it. {quote}... much more minimalistic ... {quote} I think it is almost as minimalistic as it gets (please see the diffs on the PR). I did not call the new method _{{committed}}_. I called it _offsetsFlushedAndAcknowledged_. I think that name is better, and it does not make the PR less minimalistic. Because I am "correcting" the name of one of the methods (_commit_) in _SourceTask_, by giving the necessary new method (_offsetsFlushedAndAcknowledged_) a name not in line with existing naming, I thought that I better "correct" the name of the other method (_commitRecord_) also. Therefore I also took the liberty to introduce an additional new method called _recordSentAndAcknowledged_ (just calling _commitRecord_ out-of-the-box). Introduction of _recordSentAndAcknowledged_ is not strictly necessary, but that is about the only think preventing the PR from being absolute minimum, besides renaming of a few private methods in _WorkerSourceTask_. I think naming is important, so therefore I would like to call the new method _offsetsFlushedAndAcknowledged_ instead of _committed_. I also think consistency is important, so therefore I like to introduce _recordSentAndAcknowledged_ as well. {quote}As for renaming/refactoring, we should avoid doing this 1) because it breaks backward compatibility {quote} All public methods that existed before, still exists and have the same semantics. I only introduced NEW methods. Please spell it out to me why you think I break backwards compatibility. {quote}and 2) it makes it harder to merge changes to multiple branches. Even if the names aren't ideal, their JavaDoc should adequately explain the expected behavior and when they are called and by what thread. {quote} That is a valid argument, but as I stated I also think naming and consistency is important. So what is most important - naming and consistency, or absolute minimum trouble merging? {quote}WDYT? {quote} Well, since you asked what I think, I gave you my opinion (above) :) That said, I am not in a position to insist on anything here. So I will get rid of the additional renaming/refactoring, if you insist. But I would really want you to recognize that I do not break backwards compatibility first, and then tell me that, even though I do not, you would still prefer not to "fix" naming. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not ne
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462890#comment-16462890 ] Randall Hauch commented on KAFKA-5716: -- [~steff1193], thanks for all the work on this, and sorry about the delay. First of all, if we're going to change the public API we need to have a KIP. Second, it would be preferable to have a KIP and a PR that were much more minimalistic that contain only the essential changes we have to make to enable the correct behavior. From the previous conversations, this might include: # Adding a {{committed(Map)}} method that defaults to calling {{commit()}}, and that clearly describes it as being called with the latest offsets for records that were both written and whose offsets were committed. # Deprecating the {{commit()}} method, and briefly describing why in the JavaDoc and that `committed(Map)` is the preferred approach. # Change the {{WorkerSinkTask}} to call {{committed(Map)}} instead of {{commit()}}, and this will require tracking offsets a bit more carefully. (This is where this PR will get complicated, but it doesn't have to be in the KIP.) The important thing is that we don't want to break backward compatibility, but we should provide a path forward for source connectors that need to rely upon this. WDYT? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306217#comment-16306217 ] Per Steffensen commented on KAFKA-5716: --- No one? Really? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16243639#comment-16243639 ] Per Steffensen commented on KAFKA-5716: --- Please comment! Is the PR OK, or do you want me to do more? Anyone willing to help get this committed? Does it need a KIP? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16214047#comment-16214047 ] Per Steffensen commented on KAFKA-5716: --- Ahhh, realized that I was doing more that this ticket is about, and that this part grew. I was trying to make naming consistent with respect to calling it "committing offsets" or "flushing offsets". I tried to do that for the sink-connector-side also. But that involves a lot of changes - including public ones. It is a shame that naming seems to be inconsistent across the code, though, but that should be corrected in another ticket. I do not care much whether it is called "committing offsets" or "flushing offsets", but it should be consistent. But I do care about having "commit" and "commitRecord" on SourceTask, because it sounds like it tells you what to do, and it is not necessarily something that rimes on commit you want to do, when offsets have been committed/flushed and when records have been sent and acknowledged, respectively. Let the methods on SourceTask reflect what has just happened and let the SourceTask decide its action, instead of trying to tell the SourceTask what it should do. With all the semi-unreleated naming-changes reverted, the code-changes in the PR is comprehensible again. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212416#comment-16212416 ] Per Steffensen commented on KAFKA-5716: --- Having a hard time to figure out exactly what you like me to do and not. Therefore I just did changes that I find reasonable. Please comment! I was fairly eager about doing related stuff, like deprecating, renaming related methods, being consistent on related stuff etc. Please let me know if I was too eager. Only "public" changes, as I see it is * SourceTask: offsetsFlushedAndAcknowledged and recordSentAndAcknowledged added, but commit and commitRecord still exists with same out-of-the-box semantics * WorkerConfig: OFFSET_COMMIT_XXX constants renamed to OFFSET_FLUSH_XXX Probably ought to write more here, but I am in a hurry right now. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16178406#comment-16178406 ] Ewen Cheslack-Postava commented on KAFKA-5716: -- Sorry, I was just doing a round of cleanup on the JIRAs to help track in-progress JIRAs. Submitting PRs doesn't automatically mark these as in progress so I was just updating status + assignee for all Connect JIRAs based on what had been submitted so far as it helps us find JIRAs that could use review, especially as releases are nearing. I think in retrospect the naming of commitRecord is probably unfortunate. commit() would tell you when the data is flushed to Kafka + offsets committed, whereas commitRecord really only guarantees it was written to Kafka, but then you might restart the task and read committed offsets that are earlier than that. Probably flushedRecord or ackedRecord for what commitRecord currently does + commitRecord behavior that would have the framework save the list of records and then invoke callbacks when the offset commit succeeded would be better. The motivating use case for the commitRecord was really systems that only have individual message ack anyway, e.g. message queues. In this case, any sort of bulk ack from the connect framework, i.e. commit(), isn't all that helpful and in fact requires more state tracking on the part of the connector. Because of the current behavior, it also allows you to avoid duplicates in the periods between offset commits since you can selectively destroy data that you know has made it to Kafka even if the offsets haven't been committed yet. Even if we just removed commit(), whether you'd want something other than commitRecord() would depend on how fine-grained you are ok with acking data and whether you need a collective/bulk ack. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I >
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177881#comment-16177881 ] Randall Hauch commented on KAFKA-5716: -- I agree that the current {{commit()}} method is incorrect and should be dealt with. However, my point was that you can do a lot with just {{commitRecord}} while ignoring the broken {{commit()}} method. We do need to fix this, and I think [~ewencp] assigned this to you since you were already working on a fix, tho obviously looking for guidance as to the correct route. I see [~ewencp]'s point, too, that *correcting the behavior* of the {{commit()}} method will not be trivial or inexpensive, and why he considers removing the method altogether (initially via deprecation) an attractive resolution. As I mention above, the most important thing is that {{commitRecord}} still tells you when a record has been written to Kafka. Flushed offsets are almost certainly going to be behind what's written to Kafka, which is why I think source connectors shouldn't really have to deal with or be concerned with what has been flushed. WDYT? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why i
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177692#comment-16177692 ] Per Steffensen commented on KAFKA-5716: --- [~ewencp] assigned me to this task. Does that mean, that I should add my fix-suggestion to the PR, and let us discuss from there? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177688#comment-16177688 ] Per Steffensen commented on KAFKA-5716: --- bq. Yeah, this makes sense. The existing SourceTask.commitRecord(...) method is called after each source record has been written to Kafka – can you use this to keep track of the offsets that have been written? Yeah, that is what I am doing today, because it will calculate something that is closer to the truth, than just assuming that everything polled has been written. But it is not necessarily true either. It can happen that (some of) the records that have been polled, but not included in the offsets-write/flush (which I have demonstrated can happen), has also been sent to outgoing Kafka AND received acknowledge, leading to the call to {{SourceTask.commitRecord(...)}}. So at the time of {{SourceTask.commit()}} I claim that it is possible that {{SourceTask.commitRecord(...)}} has been called for some records not included in the offsets-write/flush. You suggest several things that I could do, and some of it I am doing. I believe I know very well, how Kafka and Kafka-Connect works in this area. I can do a lot of things to "workaround" that fact that I do not really know which offsets has been written/flushed. The thing about KAFKA-5716 is that I really ought to be told - why not? Or at least the JavaDoc should not tell me that I am told, when the truth is that I am not :-) > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Assignee: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record off
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176582#comment-16176582 ] Randall Hauch commented on KAFKA-5716: -- {quote} What I am using it for can in some cases be described as "transfer data from some temporary storage (source-system) to Kafka, and when it has been safely transferred it is deleted from the source-system {quote} Yeah, this makes sense. The existing {{SourceTask.commitRecord(...)}} method is called after each source record has been written to Kafka -- can you use this to keep track of the offsets that have been written? Yeah, you have to do more bookkeeping, and you'd likely want to accumulate a bunch of offsets, but it may also be possible that if you know ahead of time the last offset in a particular file that you can react to those "last offsets" by removing the file. Note that this allows you to detect that the records *have been written to Kafka*. It is possible with this approach that if Connect were to crash before the most recent offsets have been flushed/committed, then upon restart Connect might want to start with offsets that are too old. But your connector would know that if the file(s) described by the last persisted offsets were removed, they would have been cleaned up once all of their records were written to Kafka. Could the connector just skip those files? {quote} and in other cases as just "advanced Kafka-to-Kafka ETL {quote} I wonder if you could do something similar for this case by relying upon {{commitRecord(...)}} to know when particular records have been written to Kafka. If so, I think the difference in behavior is whether you believe the offsets for *written records* or the *last committed offsets* are the "most correct" correct information. Recall that the last committed offsets may not represent the most recently *written* records, whereas the offsets from {{commitRecord(...)}} are definitely the offsets for the most recently-written records. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets fl
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172876#comment-16172876 ] Per Steffensen commented on KAFKA-5716: --- bq. The idiomatic pattern for source connectors is to rely upon the source partition and source offsets included in each source record, and to have Connect periodically commit those offsets to offset storage (e.g., Kafka in distributed mode) as well as during graceful shutdown of the connector. Connect then uses the committed offsets upon restart to inform the connector of the last persisted offsets the connector provided, and then the connector can restart from that point. Yes, I completely get that. bq. In this scenario, the connector doesn't need to acknowledge or be notified of the committed offsets since Connect is completely managing the offsets for the connector. Guess it depends bq. I'd be interested in hearing more about your use case, where IIUC your connector needs to notify the source system that progress has completed. Can you describe the source system and why you're using this approach? I believe it would be to much explanation here. I can give a few "hints", but maybe you will not be able to follow without more details. First of all, I am starting to realize that I may be using Kafka-Connect for something that it was not completely intended for. I start to believe that it is intended to "expose data in some source-system via Kafka, but the data stays in the source-system". What I am using it for can in some cases be described as "transfer data from some temporary storage (source-system) to Kafka, and when it has been safely transferred it is deleted from the source-system", and in other cases as just "advanced Kafka-to-Kafka ETL". Even though Kafka-Connect (source-connectors) is intended for the first thing, it does a very nice job for me on the second thing :-) Maybe the intended use for Kafka-Connect could change to include both :-) :-) :-) Shortly about two of the source-connectors I have 1. TCP streaming indirectly to Kafka >From a 3rd-party we receive data through TCP. Unknown number of concurrent >TCP-streams. We have only one chance to get this data, so we better make sure >we "receive and store it". We could write the TCP-streams directly to Kafka, >but for several good reasons we do write the stream to local files. Each >stream gets chopped into several files of a particular length, starting on the >next file, when the previous file has reached that length. Now, the job is to >get each of those streams onto Kafka. Using Kafka-Connect, which does the job >nicely, and bla bla bla I cannot afford to delete the original file before all >the data in it has been written to target Kafka, and the connect-offsets (used >to keep track of where in the files I got to) has been flushed, so that I will >not be asked to replay from a none-existing file. Lots of details missing, but >maybe you get the picture. As a note, there may be significantly more TCP-streams than I have partitions in my target Kafka-topic, so the streams are "multiplexed" into the partitions, several streams sharing the same partition. Each stream has its own unique key (used as Kafka-message-key), so that one stream can be distinguished from the other, and so that all messages from the same stream goes to the same partition (using default key-hash-modulo partitions-selector) 2. Parsing packets from streams multiplexed into the partitions of a Kafka topic So we have multiplexed streams in the partitions of a Kafka topic. From here I use Kafka-Connect as an advanced ETL (including the translate) on Kafka-to-Kafka. No, Kafka-Streams will not do the job. But something based on Kafka-Connect will :-) For this Kafka-Connect-ETL it is not enough to only use normal Kafka offset on the ingoing Kafka topic, because it has to keep track of where each multiplexed sub-stream of each partition got to. It does so relative to the normal committed offset on the ingoing Kafka topic. I order for this to work properly the order of things has to be * Outgoing packets must have been sent and acknowledged by outgoing Kafka (commitRecord) * Sub-stream offsets (connect-offsets) relative to the normal Kafka offset has to be safely stored (commit) * Normal Kafka offset can be moved Losts of details missing, but you will see that I have to know when connect-offstes have been written and flushed, in order to know when I can acknowledge the normal Kafka-offset, which they are relative to. I consider "multiplexing more data-streams into less Kafka-partitions" as a "problem" so general, that I hope to get the time to open-source the system. I believe others will be able to benefit. bq. Does the source not allow you to replay from any point? Are you using this mechanism to notify the source system that it can garbage collect events that your
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172057#comment-16172057 ] Randall Hauch commented on KAFKA-5716: -- {quote} I am! And couldn't live without it I cannot afford to acknowledge data going into my source-connector before the corresponding outgoing records AND their offset-changes has been written, flushed and acknowledged. Today I pretend that everything polled has been written to offset-storage when task.commit() is called, even though that is not always entirely true, but it is close. But not knowing anything about when offsets have been written to offset-storage would definitely leave me in the blind. {/quote} The idiomatic pattern for source connectors is to rely upon the source partition and source offsets included in each source record, and to have Connect periodically commit those offsets to offset storage (e.g., Kafka in distributed mode) as well as during graceful shutdown of the connector. Connect then uses the committed offsets upon restart to inform the connector of the last persisted offsets the connector provided, and then the connector can restart from that point. In this scenario, the connector doesn't need to acknowledge or be notified of the committed offsets since Connect is completely managing the offsets for the connector. I'd be interested in hearing more about your use case, where IIUC your connector needs to notify the source system that progress has completed. Can you describe the source system and why you're using this approach? Does the source not allow you to replay from any point? Are you using this mechanism to notify the source system that it can garbage collect events that your connector is reading? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #po
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172056#comment-16172056 ] Randall Hauch commented on KAFKA-5716: -- {quote} I am! And couldn't live without it I cannot afford to acknowledge data going into my source-connector before the corresponding outgoing records AND their offset-changes has been written, flushed and acknowledged. Today I pretend that everything polled has been written to offset-storage when task.commit() is called, even though that is not always entirely true, but it is close. But not knowing anything about when offsets have been written to offset-storage would definitely leave me in the blind. {/quote} The idiomatic pattern for source connectors is to rely upon the source partition and source offsets included in each source record, and to have Connect periodically commit those offsets to offset storage (e.g., Kafka in distributed mode) as well as during graceful shutdown of the connector. Connect then uses the committed offsets upon restart to inform the connector of the last persisted offsets the connector provided, and then the connector can restart from that point. In this scenario, the connector doesn't need to acknowledge or be notified of the committed offsets since Connect is completely managing the offsets for the connector. I'd be interested in hearing more about your use case, where IIUC your connector needs to notify the source system that progress has completed. Can you describe the source system and why you're using this approach? Does the source not allow you to replay from any point? Are you using this mechanism to notify the source system that it can garbage collect events that your connector is reading? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #po
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171249#comment-16171249 ] Per Steffensen commented on KAFKA-5716: --- FWIW bq. WorkerSourceTask doesn't have synchronization around at least one call to commitSourceTask (which invokes the SourceTask.commit()) method so you may call this while the main thread is invoking some other method on the connector. {{commitSourceTask}} is only called by {{commitOffsets}}, which is only (intended to be?) called by {{SourceTaskOffsetCommitter}}, so no need to have synchronization on that, to prevent it from being called several times concurrently. {{commitOffsets}} is also called by main-thread on {{execute}}-finish, but besides that. I guess, with the way there are synchronization in {{commitOffsets}} it works nicely being called by {{SourceTaskOffsetCommitter}}, at the same time as the main-thread is calling other methods on the connector. It provides the semantics that: {{task.commit()}} is called when offsets has been written and flushed, and the offsets that has been written and flushed will always include everything from a {{task.poll}} (and all previous polls), BUT NOT NECESSARILY THE LAST (FEW) POLL(S). It is all or nothing per poll. The only problem I see is that it is does not necessarily contain offsets from records from the last (few) poll(s). That is what this KAFKA-5716 is about. bq. Correcting the code would be tough I think it is a matter of exposing to {{task.commit}} which polled records have had their offset-changes included in the offsets-write and which have not. Guess it could be done by just giving "the last record that has been included in the offset-write", indirectly indicating the last poll that has been included. Please see the attached patch. And then change {{SourceTask}} to: {code} public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws InterruptedException { commit(); } public void commit() throws InterruptedException { // This space intentionally left blank. } {code} And some updated JavaDoc - maybe including deprecation of {{commit()}}, because it is ambiguous. bq. I'd propose a different solution: let's just remove the method Uhhh, please do not. Then the {{SourceTask}}-implementation will have absolutely no clue when offsets has been written and flushed. {{commitRecord}} will not help, as it is completely independent when Kafka acknowledges outgoing records (triggering {{commitRecord}}) and when record-offsets are written to offset-storage (triggering {{commit}}) bq. Given its current state, it seems unlikely anyone is actually using this functionality anyway I am! And couldn't live without it :-) I cannot afford to acknowledge data going into my source-connector before the corresponding outgoing records AND their offset-changes has been written, flushed and acknowledged. Today I pretend that everything polled has been written to offset-storage when {{task.commit()}} is called, even though that is not always entirely true, but it is close. But not knowing anything about when offsets have been written to offset-storage would definitely leave me in the blind. bq. If someone did want this functionality, we likely should just add a new {{commit(Map)}} variant That is a good alternative to the {{commit(SourceRecord lastPolledRecordWithOffsetsWritten)}} variant I suggested. bq. It'd be interesting to know if the method is overridden in any connectors that we know about in the wild. I hope it will often be the case, because that is the only clue you have about when offsets have been written to offset-storage, so if you are actually using connect-offsets (and you probably are, since you are using connect), you would probably have an incorrect system if you do not take advantage of {{commit()}}. You cannot use {{commitRecord}} for that. bq. If we just wanted a short term fix, we could definitely update the javadoc to make it clear what's actually happening and that this probably isn't what you want. That would be easy :-) But would not leave the {{SourceTask}}-implementor in any better position. It will still be undefined which polled records had their offsets written and which had not. This small change will help {code} public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws InterruptedException { commit(); } public void commit() throws InterruptedException { // This space intentionally left blank. } {code} Or the {{commit(Map)}} variant, you suggest. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issu
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170270#comment-16170270 ] Ewen Cheslack-Postava commented on KAFKA-5716: -- The basic analysis sounds right. That javadoc is from way back from the initial PR. I can't remember if we even had the separate thread for offset commits at that point, so it may have just grown outdated (and that may have been the case just based on my development of the initial version and may have just been missed in the initial review, which given it was an [11k line patch|https://github.com/apache/kafka/pull/99], I'm not surprised some things were missed in the review... There's actually another problem that reviewing this revealed that, at best is important but not clearly documented and at worst is another bug: WorkerSourceTask doesn't have synchronization around at least one call to commitSourceTask (which invokes the SourceTask.commit()) method so you may call this while the main thread is invoking some other method on the connector. I think we document elsewhere that stop() has this behavior (to interrupt polling) and so perhaps people would have handled synchronization properly, but more likely there would just be bugs. Correcting the code would be tough. We could update the javadoc, but I'd propose a different solution: let's just remove the method. It's bad that it's not actually doing what it claims and correcting it would require committing offsets synchronously which we don't want to do either. Given its current state, it seems unlikely anyone is actually using this functionality anyway. The intent was to allow you to, e.g., delete or ack data once you know it has been committed, but it clearly isn't serving that purpose and commitRecord() was added to give finer grained feedback. If someone did want this functionality, we likely should just add a new commit(Map) variant that can actually express to the user what we actually want... It'd be interesting to know if the method is overridden in any connectors that we know about in the wild. If we just wanted a short term fix, we could definitely update the javadoc to make it clear what's actually happening and that this probably isn't what you want. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and o
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167956#comment-16167956 ] Randall Hauch commented on KAFKA-5716: -- [~ewencp], if you wouldn't mind, would you take a look at this? In the issue description above, [~steff1193] suggests two possible fixes: 1) Changing the JavaDoc to say what the code *actually* does. This _may not_ require a KIP if the consensus is that this *clarifies* existing behavior rather than changing the behavior. 2) Correcting the code to implement the specified behavior. This definitely requires a KIP as it would change the public API to add a method that does the correct thing (this could be done in a backward compatible manner). I'm unable to think of another fix, but perhaps something is more obvious to you. Thoughts? > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167855#comment-16167855 ] ASF GitHub Bot commented on KAFKA-5716: --- GitHub user steff1193 opened a pull request: https://github.com/apache/kafka/pull/3872 KAFKA-5716 [WIP]: Recent polled offsets may not be written/flushed at SourceTask.commit @rhauch @tedyu @hachikuji For now a test showing that the claimed problem is true. Should definitely not be committed - its a failing test. It fails trying to assert SourceTask.commit javadoc: Commit the offsets, up to the offsets that have been returned by {@link #poll()} The contribution is my (@steff1193) original work and I license the work to the project under the project's open source license. You can merge this pull request into a Git repository by running: $ git pull https://github.com/TeletronicsDotAe/kafka KAFKA-5716 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3872 commit 9d093478b606f2defdc64ea64e306e46541126e6 Author: Per Steffensen Date: 2017-09-15T13:07:04Z KAFKA-5716 Added test showing that the claim in KAFKA-5716 is true. It is possible that the offsets of the records from very recent polls are not included in the offsets written and flushed at the time of SourceTask.commit called > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality;
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163338#comment-16163338 ] Randall Hauch commented on KAFKA-5716: -- {quote} If you can point me in the direction of some existing tests testing stuff like this, I may get the time to help write such a test. I am not sure how you usually write tests - the style, mocking, unit vs integration etc., so it would be nice with a pointer. {quote} Check out the [WorkerSourceTaskTest.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java] class that uses lots of mocks. {quote} Lets me know if I should try to find time to write a test and/or create a pull-request (not ready for pull, yet) {quote} Yes, I think this is complicated code, and we need one or more test cases that verify the problem and, more importantly, replicate the problem so that we know any changes/corrections to the code actually fix it without regressing any other behavior. Glad you found the contributors guide! Very much looking forward to more details! > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cann
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16162829#comment-16162829 ] Per Steffensen commented on KAFKA-5716: --- {quote} * Do you also use pull-request for early-not-ready-to-be-pulled code-submission? * Do you have a "how to contribute" guide somewhere, that I should read, in order to be able to "follow procedure" correctly? {quote} Never mind, found the answers on https://kafka.apache.org/contributing and https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16162824#comment-16162824 ] Per Steffensen commented on KAFKA-5716: --- Sorry for the late response. I have been busy (writing source-connectors :-) ) {quote} {code} + * will be written before after the return of from the call to this method. {code} Can you rephrase the above ? {quote} "No additional offsets will be written or flushed before returning from the call to {{offsetsCommitted}}." Do not know if thats better :-) bq. Consider putting this as pull request (off trunk). Ok, maybe I will later. I thought it probably would be a little too early. I guess, when submitting a pull-request, you actually have something you would like pulled. The patch I did is definitely not ready for inclusion in the actual code-base - at least not before having discussed thoroughly with you guys. And the final code to be pulled, should include a test testing that things work as expected - IMHO. * Do you also use pull-request for early-not-ready-to-be-pulled code-submission? * Do you have a "how to contribute" guide somewhere, that I should read, in order to be able to "follow procedure" correctly? bq. Have you tried replicating this with a test case? No unfortunately not. As I stated, it is by code-inspection only, that I put forward this issue. bq. It might not be straightforward considering it is dependent upon the timing of the {{WorkerSourceTask}} implementation, but is seems like it should be possible to replicate with a mock offset writer and consumer. Not only will this help prove your conjecture, but it also will help verify that we can correctly fix it. I believe it will be fairly easy to create a test that shows the problem (if I am right about its existence). But yes, you probably have to mock an {{OffsetStorageWriter}} with a slow {{doFlush}}-method or an {{OffsetBackingStore}} with a slow {{set}}-method. If you can point me in the direction of some existing tests testing stuff like this, I may get the time to help write such a test. I am not sure how you usually write tests - the style, mocking, unit vs integration etc., so it would be nice with a pointer. bq. Also, please be aware that all changes to the Connect API must be binary compatible with previous releases so that existing {{SourceTask}} implementations need not be changed or even recompiled to run on newer versions of the framework Yeah, I thought so. At least such a change should go through a long deprecated period, and only actually make the incompatible change in a new major release. The renaming etc in my patch was just a suggestion. Lets forget about it for now. bq. FWIW, I personally think the names of {{commitRecord(...)}}, {{commit()}}, and even a new {{commitOffsets(SourceRecord)}} on the {{SourceTask}} actually do make sense since they reflect the action that the source task should take. IMHO FWIW, I do not agree. At least it depends on the source-system of the concrete {{SourceTask}}. THE NAMING IS NOT VERY IMPORTANT TO ME, so lets just forget about it. But because we are discussing it now, I will just elaborate on my "mental model" leading to my opinion on naming. {{SourceTask.commitRecord(R)}} is a notification from Kafka-connect, that the outgoing message wrapped in a given {{SourceRecord}} R has been forwarded to and acknowledged by the receiving Kafka output-topic. The source-task typically produced this {{SourceRecord}} R, from some data D received/pulled from the source-system. What the source-task should typically do on {{SourceTask.commitRecord(R)}} is to ACKNOWLEDGE the reception, handling and forwarding of the input-data D. Such an "acknowledgement" may or may not include something that can be referred to as "commit". Traditionally "commit" is used against output-systems, while "acknowledge" is used against input-systems. For a source-connector, Kafka-connect is handling the output-side, and therefore all "committing" against the output-side, while the source-task is handling the input-side and will have to "acknowledge" against the input-system. I believe the most common implementation in {{SourceTask.commitRecord}} will be "acknowledging" input-data received from the source-system. E.g. consider a source-connector with source-tasks consuming data from a Kafka-topic. Yes, a source-connector where the source-system is Kafka itself - after all according to my "mental model" a source-connector is about making a none-Kafka data-source map into and "feel like" a Kafka data-source (streamed, partitioned, etc). Imaging such a source-connector with a Kafka-topic as "source". The source-connector would probably create tasks for each partition in the source-topic. The task would probably do something a-la: * Receive a message (IM) from the designated partition of the source-topic via Kafka-
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160407#comment-16160407 ] Randall Hauch commented on KAFKA-5716: -- [~steff1193], thanks for logging the issue and submitting a proposal for the change. You're analysis does seem to be correct, though I need to spend a bit more time verifying it, and I'd love to hear [~hachikuji]'s thoughts. Have you tried replicating this with a test case? It might not be straightforward considering it is dependent upon the timing of the WorkerSourceTask implementation, but is seems like it should be possible to replicate with a mock offset writer and consumer. Not only will this help prove your conjecture, but it also will help verify that we can correctly fix it. Also, please be aware that all changes to the Connect API *must* be binary compatible with previous releases so that existing SourceTask implementations need not be changed or even recompiled to run on newer versions of the framework. This has two implications for your proposed patch. First, we cannot change the signature of public API methods, so we need to avoid changing the name of {{commitRecord(...)}}. Like it or not, though, we're stuck with that naming convention. FWIW, I personally think the names of {{commitRecord(...)}}, {{commit()}, and even a new {{commitOffsets(SourceRecord)}} on the SourceTask actually _do_ make sense since they reflect the action that the source task should take. Second, we have to keep the existing {{commit()}} method, but we deprecate it and can add a new method such as {{commitOffsets(SourceRecord)}} that has the behavior you state and that should by default call the existing {{commit()}}. Doing this would allow existing SourceTask implementations to continue to work, albeit potentially with the incorrect assumptions/behavior as you argue above. However, any developer that wants to take advantage of the new/correct behavior can change their {{SourceTask}} implementations to instead use {{commitOffsets(SourceRecord)}} rather than {{commit()}} to switch to the correct behavior. Note that we should definitely have thorough JavaDoc for both {{commit()}} and {{commitOffsets(SourceRecord)}} that should say what they do by default and how implementations can take advantage of them. Again, I *think* your analysis is correct, but I need to spend time looking into this and hopefully get others to confirm it as well. In the meantime, it might be worth updating your patch and, as [~yuzhih...@gmail.com] mentioned, creating a pull request with your changes based the {{trunk}} branch. One advantage of maintaining binary compatibility is that we can more easily backport the changes to older branches, though just because we can doesn't mean we will. If the committers do backport and have any non-trivial merge conflicts, they'll ask for a separate dedicated PR for the older branch(es). Thanks again for all your work on this! > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respecti
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160002#comment-16160002 ] Ted Yu commented on KAFKA-5716: --- {code} + * will be written before after the return of from the call to this method. {code} Can you rephrase the above ? {code} + * @param record Last polled {@link SourceRecord} that had its offsets successfully written and flushed. {code} Please note that record may be null. Consider putting this as pull request (off trunk). > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120577#comment-16120577 ] Per Steffensen commented on KAFKA-5716: --- {quote} Commit the offsets, up to the offsets that have been returned by \{@link #poll()} *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. {quote} Actually I am pretty sure that is not true either, so I guess when task.commit() is called, you really do not know exactly for which of the SourceRecords you returned from poll * a) the derived outgoing ProducerRecord has been sent to Kafka * b) the derived connector-offsets has been written/flushed Guess the only thing I can think of, where you do not have to introduce significantly more synchronization (we do not want that), but where you as the SourceTask-developer knows "enough" for most cases, is to change SourceTask.commit() to take a parameter of type SourceRecord. This record will be the last record given by WorkerSourceTask to its offsetWriter. The SourceTask-developer will know upon call to commit, that * a) Outgoing ProducerRecords derived from any SourceRecord returned from poll prior to the given SourceRecord (included) have been sent to Kafka. Outgoing ProducerRecords derived from any SourceRecord returned from poll after the given SourceRecord may or may not have been sent to Kafka (conservative guarantee) * b) Connector-offsets derived from any SourceRecord returned from poll prior to the given SourceRecord (included) have been written and flushed to offset-storage. Outgoing connector-offsets derived from any SourceRecord returned from poll after the given SourceRecord have NOT been written to offset-storage (strict guarantee) Sorry, just thinking loud > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug >Reporter: Per Steffensen >Priority: Minor > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. >