[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-12 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343028#comment-17343028
 ] 

Ning Zhang commented on KAFKA-12635:


backport is a great point. I guess the review cycle will probably take 3-4 
weeks at most, so I will let the committer/reviewer know the backport option 
and see what is available from their point of view.

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-10 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342308#comment-17342308
 ] 

Ning Zhang commented on KAFKA-12635:


great, thanks for the feedback. I will proceed to finalize the pull request. 

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-07 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341105#comment-17341105
 ] 

Ning Zhang commented on KAFKA-12635:


updated and tested the PR

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-07 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340917#comment-17340917
 ] 

Ning Zhang commented on KAFKA-12635:


[~dragotic] I see, I probably missed your comment "but before Step 5. We create 
the consumer groups on the target cluster using the kafka-console-consumer.sh 
command with two extra flags:  --max-messages 1 --timeout-ms 6000"

I am updating the PR and testing it again

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-06 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340533#comment-17340533
 ] 

Ning Zhang commented on KAFKA-12635:


hi [~fyi] [~dragotic] I compiled the latest trunk Kafka version with the above 
pull request at 
[https://github.com/ning2008wisc/kafka-trunk-binary/blob/master/kafka_2.13-3.0.0-SNAPSHOT.tgz]
 It looks solve the bug from my local testing and would like to hear your 
initial feedback. If good, I will polish and have the pull request ready for 
review. Thanks for your patience 

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-06 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340532#comment-17340532
 ] 

Ning Zhang commented on KAFKA-12635:


pull request: https://github.com/apache/kafka/pull/10644

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-01 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337896#comment-17337896
 ] 

Ning Zhang commented on KAFKA-12635:


> This state can be reached if the source consumer group consumed some records 
>that were now deleted (like by a retention policy), or if Mirrormaker 
>replication is set to start at "latest". 

[~fyi] I am reading your above statement several times, and could not figure 
out the reproduce scenario. Do you mind to share the scenario step-by-step from 
a good state to a bad state? Thanks

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-01 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337894#comment-17337894
 ] 

Ning Zhang commented on KAFKA-12635:


hi [~dragotic] could you please elaborate on how you can re-produce the issue 
step-by-step? 

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-04-14 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321170#comment-17321170
 ] 

Ning Zhang commented on KAFKA-12635:


[~akaltsikis] probably try to manually create a consumer group with initial 
offset = 0. The console consumer command will do that

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-04-09 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-12635:
--

Assignee: Ning Zhang

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10750) test failure scenarios of MirrorMaker 2

2020-11-19 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10750:
--

 Summary: test failure scenarios of MirrorMaker 2
 Key: KAFKA-10750
 URL: https://issues.apache.org/jira/browse/KAFKA-10750
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Ning Zhang
Assignee: Ning Zhang


As a follow up of https://issues.apache.org/jira/browse/KAFKA-10304, it may be 
necessary to test the failure scenarios, e.g. Kafka broker stop then start

To make PR [https://github.com/apache/kafka/pull/9224] smaller, we chopped down 
the testing code for failure scenarios, and plan to add them back in this 
ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10737:
---
Description: 
As MM2 is adopted by more users, new and advanced use cases requires Kafka 
clusters to be secure, and one way to make this happen is SSL-enabled cluster.

Currently there is no clear doc on how to configure MM2 so that it can consume 
from or produce to a SSL-enable cluster. Some users spent quite amount of time 
and found out the right config (see 
https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to 
clearly doc on this.

  was:
As MM2 is adopted by more users, new and advanced use cases requires Kafka 
clusters to be secure, and one way to make this happen is SSL-enabled cluster.

Currently there is no clear doc on how to configure MM2 so that it can consume 
from or produce to a SSL-enable cluster. Some users spent quite amount of time 
and found out the right config (see ). So it would be great to clearly doc on 
this.


> MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster 
> in README.md 
> --
>
> Key: KAFKA-10737
> URL: https://issues.apache.org/jira/browse/KAFKA-10737
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
>
> As MM2 is adopted by more users, new and advanced use cases requires Kafka 
> clusters to be secure, and one way to make this happen is SSL-enabled cluster.
> Currently there is no clear doc on how to configure MM2 so that it can 
> consume from or produce to a SSL-enable cluster. Some users spent quite 
> amount of time and found out the right config (see 
> https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to 
> clearly doc on this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md

2020-11-17 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10737:
--

 Summary: MirrorMaker 2: clarify how to produce to or consume from 
SSL-enabled cluster in README.md 
 Key: KAFKA-10737
 URL: https://issues.apache.org/jira/browse/KAFKA-10737
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Ning Zhang
Assignee: Ning Zhang


As MM2 is adopted by more users, new and advanced use cases requires Kafka 
clusters to be secure, and one way to make this happen is SSL-enabled cluster.

Currently there is no clear doc on how to configure MM2 so that it can consume 
from or produce to a SSL-enable cluster. Some users spent quite amount of time 
and found out the right config (see ). So it would be great to clearly doc on 
this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang resolved KAFKA-10704.

Resolution: Resolved

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: listNodes
> {code}



--
This message 

[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reopened KAFKA-10704:


> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: listNodes
> {code}



--
This 

[jira] [Updated] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10704:
---
Fix Version/s: (was: 2.7.0)

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: listNodes
> {code}



--

[jira] [Commented] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17234271#comment-17234271
 ] 

Ning Zhang commented on KAFKA-10704:


Indeed, we should clearly document how to produce to SSL-enabled cluster. I 
will create a PR against 
[https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md] to add a 
section about SSL-enabled cluster

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, 

[jira] [Assigned] (KAFKA-10704) Mirror maker with TLS at target

2020-11-17 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-10704:
--

Assignee: Ning Zhang

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Assignee: Ning Zhang
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=listNodes, deadlineMs=1605012024641, tries=1, 
> nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: 

[jira] [Commented] (KAFKA-10728) Mirroring data without decompressing with MirrorMaker 2.0

2020-11-17 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17234255#comment-17234255
 ] 

Ning Zhang commented on KAFKA-10728:


my first impression is: unless the producer of MM2 is explicitly set to use 
`uncompressed` with [https://kafka.apache.org/documentation/#compression.type] 
it will use the default compression value

> Mirroring data without decompressing with MirrorMaker 2.0
> -
>
> Key: KAFKA-10728
> URL: https://issues.apache.org/jira/browse/KAFKA-10728
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Eazhilan Nagarajan
>Priority: Major
>
> Hello, 
>  
> I use MirrorMaker 2.0 to copy data across two Kafka clusters and it's all 
> working fine. Recently we enabled compressing while producing data into any 
> topic which had a very positive impact on the storage and other resources but 
> while mirroring, the data seems to be decompressed at the target Kafka 
> cluster. I tried enabling compression using the below config in MM2, the data 
> at the target cluster is compressed now, the decompress and re-compress 
> continues to happen and it eats up a lot of resources unnecessarily.
>  
> {noformat}
> - alias: my-passive-cluster
> authentication:
>   passwordSecret:
> password: password
> secretName: passive-cluster-secret
>   type: scram-sha-512
>   username: user-1
> bootstrapServers: my-passive-cluster.com:443
> config:
>   config.storage.replication.factor: 3
>   offset.storage.replication.factor: 3
>   status.storage.replication.factor: 3
>   producer.compression.type: gzip{noformat}
>  I found couple of Jira issues talking about it but I don't know if the 
> shallow iterator option is available now.
> https://issues.apache.org/jira/browse/KAFKA-732, 
> https://issues.apache.org/jira/browse/KAFKA-845
>  
> Kindly let me if this is currently available or if it'll be available in the 
> future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration

2020-11-15 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232354#comment-17232354
 ] 

Ning Zhang commented on KAFKA-10719:


The potential config inconsistence of MM2 may be rooted at how Kafka Connect 
saves the config, simply because MM2 is built on top of Kafka Connect. 
Typically Kafka Connect put config in a special kafka topic. By reading the 
description section of 
[https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java]
 In rare cases, there is a chance of seeing inconsistent config across 
different tasks. 

> MirrorMaker2 fails to update its runtime configuration
> --
>
> Key: KAFKA-10719
> URL: https://issues.apache.org/jira/browse/KAFKA-10719
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Peter Sinoros-Szabo
>Priority: Major
>
> I was running successfully the MM2 cluster with the following configuration, 
> I simplified it a little: {code:java} clusters = main, backup 
> main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 
> backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 
> main->backup.enabled = true main->backup.topics = .*{code} I wanted to change 
> the bootstrap.address list of the destination cluster to a different list 
> that is pointing to the *same* cluster, just a different listener with a 
> different routing. So I changed it to: {code:java} backup.bootstrap.servers = 
> backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart 
> on the MM2 nodes and say that some tasks were still using the old bootstrap 
> addresses, some of them were using the new one. I don't have the logs, so 
> unfortunately I don't know which one picked up the good values and which 
> didn't. I even stopped the cluster completely, but it didn't help. Ryanne 
> adviced to delete the mm2-config and mm2-status topics, so I did delete those 
> on the destination cluster, that solved this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10704) Mirror maker with TLS at target

2020-11-15 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232338#comment-17232338
 ] 

Ning Zhang edited comment on KAFKA-10704 at 11/15/20, 4:34 PM:
---

This PR [https://github.com/apache/kafka/pull/9224] actually tested out SSL 
case (unencrypted source cluster, but encrypted target cluster). In real world, 
there is use case of mirroring from unencrypted cluster to AWS hosted Kafka 
(always encrypted). So I believe current MirrorMaker 2 can support encryption 
at source or target out-of-the-box 


was (Author: yangguo1220):
This PR [https://github.com/apache/kafka/pull/9224] actually tested out SSL 
case (unencrypted source cluster, but encrypted target cluster). In real world, 
there is use case of mirroring from unencrypted cluster to AWS hosted Kafka 
(always encrypted). So I believe current MirrorMaker 2 can support encryption 
out-of-the-box 

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 

[jira] [Commented] (KAFKA-10704) Mirror maker with TLS at target

2020-11-15 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232338#comment-17232338
 ] 

Ning Zhang commented on KAFKA-10704:


This PR [https://github.com/apache/kafka/pull/9224] actually tested out SSL 
case (unencrypted source cluster, but encrypted target cluster). In real world, 
there is use case of mirroring from unencrypted cluster to AWS hosted Kafka 
(always encrypted). So I believe current MirrorMaker 2 can support encryption 
out-of-the-box 

> Mirror maker with TLS at target
> ---
>
> Key: KAFKA-10704
> URL: https://issues.apache.org/jira/browse/KAFKA-10704
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Tushar Bhasme
>Priority: Critical
> Fix For: 2.7.0
>
>
> We need to setup mirror maker from a single node kafka cluster to a three 
> node Strimzi cluster. There is no SSL setup at source, however the target 
> cluster is configured with MTLS.
> With below config, commands from source like listing topics etc are working:
> {code:java}
> cat client-ssl.properties
> security.protocol=SSL
> ssl.truststore.location=my.truststore
> ssl.truststore.password=123456
> ssl.keystore.location=my.keystore
> ssl.keystore.password=123456
> ssl.key.password=password{code}
> However, we are not able to get mirror maker working with the similar configs:
> {code:java}
> source.security.protocol=PLAINTEXT
> target.security.protocol=SSL
> target.ssl.truststore.location=my.truststore
> target.ssl.truststore.password=123456
> target.ssl.keystore.location=my.keystore
> target.ssl.keystore.password=123456
> target.ssl.key.password=password{code}
> Errors while running mirror maker:
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out 
> at 1605011994643 after 1 attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: fetchMetadata
> [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 
> unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
> [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata 
> update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
> org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, 
> deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) 
> timed out at 9223372036854775807 after 1attempt(s)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: fetchMetadata
> [2020-11-10 12:40:24,644] INFO Metrics scheduler closed 
> (org.apache.kafka.common.metrics.Metrics:668)
> [2020-11-10 12:40:24,644] INFO Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter 
> (org.apache.kafka.common.metrics.Metrics:672)
> [2020-11-10 12:40:24,644] INFO Metrics reporters closed 
> (org.apache.kafka.common.metrics.Metrics:678)
> [2020-11-10 12:40:24,645] ERROR Stopping due to error 
> (org.apache.kafka.connect.mirror.MirrorMaker:304)
> org.apache.kafka.connect.errors.ConnectException: Failed to connect to and 
> describe Kafka cluster. Check worker's broker connection and security 
> properties.
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148)
> at 
> org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, 
> deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out 
> at 1605012024642 after 1 attempt(s)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
> ... 7 more
> Caused by: 

[jira] [Resolved] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-11-14 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang resolved KAFKA-10424.

Resolution: Not A Bug

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-09-21 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199546#comment-17199546
 ] 

Ning Zhang commented on KAFKA-10339:


Thanks [~mimaison], once the above PR 8730 is merged, I will rebase from it 
immediately and https://issues.apache.org/jira/browse/KAFKA-10304 could be 
prioritized. 

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-09-21 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199443#comment-17199443
 ] 

Ning Zhang commented on KAFKA-10339:


[~mimaison] When possible, very appreciated if KAFKA-10483 and KAFKA-10304 
could be reviewed first, which are will make this task easier to test and 
implement. If there may be other folks who are more appropriate to review, 
please nominate and I will contact. Thanks

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10483) extract common functions from SourceConnector and SourceTask

2020-09-15 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10483:
--

 Summary: extract common functions from SourceConnector and 
SourceTask
 Key: KAFKA-10483
 URL: https://issues.apache.org/jira/browse/KAFKA-10483
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Ning Zhang
Assignee: Ning Zhang


When implementing MM2 EOS feature, existing functions from 
`MirrorSourceConnector` and `MirrorSourceTask` can be reused for 
`MirrorSinkConnector` and `MirrorSinkTask`. 

To minimize the maintenance cost in the long-term and maximum the code 
reusability, it worths to extract common functions out of current 
`MirrorSourceConnector` and `MirrorSourceTask`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-09 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192664#comment-17192664
 ] 

Ning Zhang edited comment on KAFKA-10304 at 9/9/20, 3:00 PM:
-

Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 
integration tests. The purpose of doing that: (1) address the concern in [a 
previous PR|https://github.com/apache/kafka/pull/9029#issuecomment-663094946], 
(2) prepare for the future development (e.g. extract common functions). I think 
the current PR ([https://github.com/apache/kafka/pull/9224]) is just a starting 
point, and I am very appreciated for your feedback on what to test additionally 
and how to get close to the real scenario.


was (Author: yangguo1220):
Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 
integration tests. The purpose of doing that: (1) address the concern in the 
previous PR (https://github.com/apache/kafka/pull/9029), (2) prepare for the 
future development (e.g. extract common functions). I think the current PR 
(https://github.com/apache/kafka/pull/9224) is just a starting point, and I am 
very appreciated for your feedback on what to test additionally and how to get 
close to the real scenario.

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change, [some 
> concerns|https://github.com/apache/kafka/pull/9029#issuecomment-663094946] on 
> tests were raised. It may be a good time to revisit and refactor the tests, 
> possibly in the following way:
> (1) are 100 messages good enough for integration tests?
>  (2) what about the broker failure in the middle of integration tests?
>  (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-09 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Description: 
In a different MM2 change, [some 
concerns|https://github.com/apache/kafka/pull/9029#issuecomment-663094946] on 
tests were raised. It may be a good time to revisit and refactor the tests, 
possibly in the following way:

(1) are 100 messages good enough for integration tests?
 (2) what about the broker failure in the middle of integration tests?
 (3) other validations to check (e.g. topic config sync)

  was:
In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
concerns on tests were raised. It may be a good time to revisit and refactor 
the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the broker failure in the middle of integration tests?
(3) other validations to check (e.g. topic config sync)


> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change, [some 
> concerns|https://github.com/apache/kafka/pull/9029#issuecomment-663094946] on 
> tests were raised. It may be a good time to revisit and refactor the tests, 
> possibly in the following way:
> (1) are 100 messages good enough for integration tests?
>  (2) what about the broker failure in the middle of integration tests?
>  (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-09 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Comment: was deleted

(was: https://github.com/apache/kafka/pull/9224)

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
> concerns on tests were raised. It may be a good time to revisit and refactor 
> the tests, possibly in the following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the broker failure in the middle of integration tests?
> (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-09 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192664#comment-17192664
 ] 

Ning Zhang commented on KAFKA-10304:


Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 
integration tests. The purpose of doing that: (1) address the concern in the 
previous PR (https://github.com/apache/kafka/pull/9029), (2) prepare for the 
future development (e.g. extract common functions). I think the current PR 
(https://github.com/apache/kafka/pull/9224) is just a starting point, and I am 
very appreciated for your feedback on what to test additionally and how to get 
close to the real scenario.

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
> concerns on tests were raised. It may be a good time to revisit and refactor 
> the tests, possibly in the following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the broker failure in the middle of integration tests?
> (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-09-09 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Description: 
In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
concerns on tests were raised. It may be a good time to revisit and refactor 
the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the broker failure in the middle of integration tests?
(3) other validations to check (e.g. topic config sync)

  was:
due to the quick development of Kafka MM 2, unit and integration tests of 
MirrorMaker 2 were made just for covering each individual feature and some of 
them are simply copy-n-paste from the existing tests with small tweaks. It may 
be a good time to revisit and improve the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the failure in the middle of integration tests?
(3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
sync..) beyond the mirrored message in integration tests?


> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.8.0
>
>
> In a different MM2 change https://github.com/apache/kafka/pull/9029, some 
> concerns on tests were raised. It may be a good time to revisit and refactor 
> the tests, possibly in the following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the broker failure in the middle of integration tests?
> (3) other validations to check (e.g. topic config sync)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-08-25 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10304:
---
Component/s: mirrormaker

> Revisit and improve the tests of MirrorMaker 2
> --
>
> Key: KAFKA-10304
> URL: https://issues.apache.org/jira/browse/KAFKA-10304
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Minor
>
> due to the quick development of Kafka MM 2, unit and integration tests of 
> MirrorMaker 2 were made just for covering each individual feature and some of 
> them are simply copy-n-paste from the existing tests with small tweaks. It 
> may be a good time to revisit and improve the tests, possibly in the 
> following way:
> (1) are 100 messages good enough for integration tests?
> (2) what about the failure in the middle of integration tests?
> (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
> sync..) beyond the mirrored message in integration tests?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-25 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10339:
---
Component/s: mirrormaker

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:13 AM:
--

[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:*cleanup.policy=compact*
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start


was (Author: yangguo1220):
[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:13 AM:
--

[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start


was (Author: yangguo1220):
[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:*cleanup.policy=compact*
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:12 AM:
--

[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

After running MM2:

{code:java}
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
{code}

note that `cleanup.policy=compact` is added after MM2 start


was (Author: yangguo1220):
[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```
After running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-25 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891
 ] 

Ning Zhang commented on KAFKA-10424:


[~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a 
bug:

before running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 Configs:
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```
After running MM2:
```
bash-4.4#  /opt/kafka/bin/kafka-topics.sh --zookeeper 
zookeeper-service-backup:2181 --describe --topic primary.test
Topic:primary.test  PartitionCount:3ReplicationFactor:3 
Configs:cleanup.policy=compact
Topic: primary.test Partition: 0Leader: 1   Replicas: 1,2,0 
Isr: 1,2,0
Topic: primary.test Partition: 1Leader: 2   Replicas: 2,0,1 
Isr: 2,0,1
Topic: primary.test Partition: 2Leader: 0   Replicas: 0,1,2 
Isr: 0,1,2
```

note that `cleanup.policy=compact` is added after MM2 start

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-10424:
--

Assignee: Ning Zhang

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10133:
---
Fix Version/s: 2.7.0

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.7.0
>
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reopened KAFKA-10133:


There is no bug in the code, but need some efforts on doc to clarify on where 
some frequently used configs should be set.

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-10133:
--

Assignee: Ning Zhang

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183401#comment-17183401
 ] 

Ning Zhang commented on KAFKA-10133:


Great. Then I will probably make a pr to clarify some use cases like this. 
Thanks

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182977#comment-17182977
 ] 

Ning Zhang commented on KAFKA-10424:


Actually, I did a quick validation on real deployment, and the first impression 
is: 

"cleanup.policy=compact" is not replicated to the remote topic on the target 
cluster.

So it seems a bug to me and need to look into it

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182976#comment-17182976
 ] 

Ning Zhang commented on KAFKA-10133:


[~steveatbat] My experiment seems to show different conclusion.

In order to preserve the compression type from upstream producer, in MM2 config 
file, I override the producer config, such as,

{code:java}
.producer.compression.type = gzip
{code}

Then run this command on the target cluster to verify:

{code:java}
/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration 
--files /kafka/kafka-logs-kafka2-0/primary.test-0/.log
{code}

Output:

{code:java}
Starting offset: 0
offset: 0 position: 0 CreateTime: 1598249096409 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 1 position: 0 CreateTime: 1598249106432 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 2 position: 0 CreateTime: 1598249158144 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
{code}

The output shows that the messages replicated to the target cluster are 
compressed with GZIP.

So I am wondering what exact config you use at kafka connect worker level to 
make compression work



> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-23 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182698#comment-17182698
 ] 

Ning Zhang commented on KAFKA-10424:


Then can you check if other topic prosperities (if any) are copied over from 
source to target cluster?

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-22 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182572#comment-17182572
 ] 

Ning Zhang edited comment on KAFKA-10424 at 8/23/20, 3:28 AM:
--

[~grinfeld] "_schemas" topic is a compacted topic. When it is replicated, make 
sure its name on the target cluster prefix with 
, e.g. "primary._schemas" and having your 
schema registry service on target cluster using the correct topic name.

MirrorSourceConnector should periodically sync the topic config (including 
"cleanup.policy") from source to target 
(https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L122)
 so the "_schemas" topic on target cluster should be compacted topic.

To debug, 
check if  "_schemas" topic on target cluster is *indeed* compacted topic, 
especially at the moment when you see the above exception.



was (Author: yangguo1220):
[~grinfeld] "_schemas" topic is a compacted topic. When it is replicated, make 
sure its name on the target cluster prefix with 
, e.g. "primary._schemas"

MirrorSourceConnector should periodically sync the topic config (including 
"cleanup.policy") from source to target 
(https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L122)
 so the "_schemas" topic on target cluster should be compacted topic.

To debug, 
check if  "_schemas" topic on target cluster is *indeed* compacted topic, 
especially at the moment when you see the above exception.


> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-22 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182572#comment-17182572
 ] 

Ning Zhang commented on KAFKA-10424:


[~grinfeld] "_schemas" topic is a compacted topic. When it is replicated, make 
sure its name on the target cluster prefix with 
, e.g. "primary._schemas"

MirrorSourceConnector should periodically sync the topic config (including 
"cleanup.policy") from source to target 
(https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L122)
 so the "_schemas" topic on target cluster should be compacted topic.

To debug, 
check if  "_schemas" topic on target cluster is *indeed* compacted topic, 
especially at the moment when you see the above exception.


> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-21 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181928#comment-17181928
 ] 

Ning Zhang commented on KAFKA-10370:


After looking at HDFS Sink connector 
https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java,
 it seems that the correct resolution is to add the following method to the 
SinkTask implementation.

   @Override
public void open(Collection partitions) {
loadContextOffsets();
}

* loadContextOffsets() is the method to load consumer group offsets from target 
cluster and put them into context.offsets()

Therefore, no change is needed and I am closing this ticket

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> 

[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-18 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179669#comment-17179669
 ] 

Ning Zhang commented on KAFKA-10370:


Hey [~ryannedolan] I updated the PR with the following code in 
`onPartitionsAssigned`. From my initial testing, it works well with 
`MirrorSinkTask`. Since the partition assignment is now driven by Consumer (as 
we use `consumer.subscribe()`), the `offsets` that passed into 
`context.offsets(offsets)` is all  associated with consumer 
group, rather than letting `MirrorSinkTask` to do the consuming task assignment.

Appreciate for your above thoughts and definitely expecting more feedback!

https://github.com/apache/kafka/pull/9145/files#diff-9d27e74bcdc892150367aed9a4cf499eR617-R698

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> 

[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174955#comment-17174955
 ] 

Ning Zhang commented on KAFKA-10370:


Hi [~rhauch], when you have a chance, I would like to get your initial feedback 
/ advice on this issue and proposed solution. Thanks cc [~ryannedolan]

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> 

[jira] [Issue Comment Deleted] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Comment: was deleted

(was: https://github.com/apache/kafka/pull/9145)

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Reviewer: Randall Hauch

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use *consumer.assign* with 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Fix Version/s: (was: 2.6.0)
   2.7.0

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-08 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
 when we want the consumer to consume from certain offsets, rather than from 
the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind the consumer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call 
[rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
 to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
empty, (2) consumer.seek(tp, offset) to rewind the consumer.

As a part of [WorkerSinkTask 
initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
 when the [SinkTask 
starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
 we can supply the specific offsets by +"context.offset(supplied_offsets);+" in 
start() method, so that when the consumer does the first poll, it should rewind 
to the specific offsets in rewind() method. However in practice, we saw the 
following IllegalStateException when running consumer.seek(tp, offsets);

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution (that has been initially verified) proposed in the attached PR 
is to use *consumer.assign* with *consumer.seek* , instead of 
*consumer.subscribe*, to handle the initial position of the consumer, when 
specific offsets are provided by external through WorkerSinkTaskContext

  was:
In 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
 when we want the consumer to consume from certain offsets, rather than from 
the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind the consumer. 

In the [poll() 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-08 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
 when we want the consumer to consume from certain offsets, rather than from 
the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind the consumer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call 
[rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
 to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
empty, (2) consumer.seek(tp, offset) to rewind the consumer.

As a part of [WorkerSinkTask 
initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
 when the [SinkTask 
starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
 we can supply the specific offsets by +"context.offset(supplied_offsets);+" in 
start() method, so that when the consumer does the first poll, it should rewind 
to the specific offsets in rewind() method. However in practice, we saw the 
following IllegalStateException when running consumer.seek(tp, offsets);

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*, in this case.

  was:
In WorkerSinkTask.java, when we want the consumer to consume from certain 
offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind consumer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the 
consumer.

when SinkTask first initializes 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-08 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to consume from certain 
offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind consumer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the 
consumer.

when SinkTask first initializes (+start(Map props)+), we do 
+"context.offset(offsets);+" , then in above step (2), we saw the following 
IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*, in this case.

  was:
In WorkerSinkTask.java, when we want the consumer to consume from certain 
offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind customer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the 
consumer.

when SinkTask first initializes (+start(Map props)+), we do 
+"context.offset(offsets);+" , then in above step (2), we saw the following 
IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-07 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to consume from certain 
offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind customer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the 
consumer.

when SinkTask first initializes (+start(Map props)+), we do 
+"context.offset(offsets);+" , then in above step (2), we saw the following 
IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*, in this case.

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-07 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-07 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution is to use *consumer.assign* with *consumer.seek* , instead of 
*consumer.subscribe*


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.6.0
>
>
> In WorkerSinkTask.java, when we want the consumer to start consuming from 
> certain offsets, rather than from the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
>  is used to carry the offsets from external world (e.g. implementation of 
> SinkTask).
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
> (2) consumer.seek(tp, offset) to rewind the consumer.
> when running (2), we saw the following IllegalStateException:
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition mytopic-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-06 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution is to use *consumer.assign* with *consumer.seek* , instead of 
*consumer.subscribe*

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:
```
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
```

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution is to use *consumer.assign* with *consumer.seek* , instead of 
*consumer.subscribe*


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.6.0
>
>
> In WorkerSinkTask.java, when we want the consumer to start consuming from 
> certain offsets, rather than from the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
>  is used to carry the offsets from external world (e.g. implementation of 
> SinkTask).
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
> (2) consumer.seek(tp, offset) to rewind the consumer.
> when running (2), we saw the following IllegalStateException:
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition mytopic-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution is to use *consumer.assign* with *consumer.seek* 

[jira] [Created] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-06 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10370:
--

 Summary: WorkerSinkTask: IllegalStateException cased by 
consumer.seek(tp, offsets) when (tp, offsets) are supplied by 
WorkerSinkTaskContext
 Key: KAFKA-10370
 URL: https://issues.apache.org/jira/browse/KAFKA-10370
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Ning Zhang
Assignee: Ning Zhang
 Fix For: 2.6.0


In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:
```
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
```

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution is to use *consumer.assign* with *consumer.seek* , instead of 
*consumer.subscribe*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171670#comment-17171670
 ] 

Ning Zhang commented on KAFKA-10339:


it is true that SinkTaskContext.offsets() has been taken covered. I will test 
this updated idea out in my local and the extra step is to create a "fake" 
consumer group on target cluster.

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171612#comment-17171612
 ] 

Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:47 PM:
-

thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets for the consumer in WorkerSinkTask as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?


was (Author: yangguo1220):
thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171612#comment-17171612
 ] 

Ning Zhang commented on KAFKA-10339:


thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-05 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171612#comment-17171612
 ] 

Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:44 PM:
-

thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_

I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?


was (Author: yangguo1220):
thanks for the input. I think that sounds a working plan. Here is my follow-up 
thoughts

_"when MirrorSourceTask starts, it loads initial offsets from 
__consumer_offsets on the target cluster."_

As the consumer is configured to pull data from source cluster, I am thinking 
we probably need to:
(1) add a new API (called "Map loadOffsets()") to 
[SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java]
 
(2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the 
consumer offsets loaded from target cluster.
(3) in 
[WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295],
 when initialize the consumer, if `task.loadOffsets()` returns non empty, use 
the returned offsets as the starting point.

_"in addition, we call addOffsetsToTransaction in order to commit offsets to 
the "fake" consumer group on the target cluster."_
I fully agree with the "fake" consumer group on the target cluster. I am 
thinking if "addOffsetsToTransaction" has been taken care by 
*producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*?

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-04 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171249#comment-17171249
 ] 

Ning Zhang commented on KAFKA-10339:


Technically, transaction can not be done across clusters, so the above KIP is 
not valid. Need to look for other alternatives

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170231#comment-17170231
 ] 

Ning Zhang commented on KAFKA-10339:


[~ryannedolan] [~mimaison] Here is the KIP I would like to get your thoughts 
on, thanks so much for any feedback or input
https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+MirrorMaker2+Exactly-once+Semantics

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10339:
---
Labels: needs-kip  (was: )

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10339:
--

 Summary: MirrorMaker2 Exactly-once Semantics
 Key: KAFKA-10339
 URL: https://issues.apache.org/jira/browse/KAFKA-10339
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Ning Zhang
Assignee: Ning Zhang


MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
specifically the Source Connector / Task, which do not provide exactly-once 
semantics (EOS) out-of-the-box, as discussed in 
https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
https://github.com/apache/kafka/pull/5553, 
https://issues.apache.org/jira/browse/KAFKA-6080  and 
https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-07-23 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10304:
--

 Summary: Revisit and improve the tests of MirrorMaker 2
 Key: KAFKA-10304
 URL: https://issues.apache.org/jira/browse/KAFKA-10304
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Ning Zhang
Assignee: Ning Zhang


due to the quick development of Kafka MM 2, unit and integration tests of 
MirrorMaker 2 were made just for covering each individual feature and some of 
them are simply copy-n-paste from the existing tests with small tweaks. It may 
be a good time to revisit and improve the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the failure in the middle of integration tests?
(3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
sync..) beyond the mirrored message in integration tests?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-07-23 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163802#comment-17163802
 ] 

Ning Zhang commented on KAFKA-1:


Hi Chris, the purpose of this ticket is very interesting. I wonder what is the 
priority in the overall Kafka Connect backlog, or how is the progress so far 
(needs-KIP)? Thanks

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2020-06-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144388#comment-17144388
 ] 

Ning Zhang commented on KAFKA-9076:
---

[~rhauch] It is true that this jira is not a blocking factor to 2.6.0 release 
and thank you for checking it.

Given the PR has been proposed for over 6 months, some users are testing it in 
different use cases, I would hope the committers ([~mimaison]) and other 
reviewers may take an other review on https://github.com/apache/kafka/pull/7577 
and see if we could iterate faster, so that it can formally be part of Kafka in 
the 2.7.0 release.

> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.7.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.
>  KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> [https://github.com/apache/kafka/pull/7577]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9219) NullPointerException when polling metrics from Kafka Connect

2020-04-18 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-9219:
-

Assignee: Ning Zhang

> NullPointerException when polling metrics from Kafka Connect
> 
>
> Key: KAFKA-9219
> URL: https://issues.apache.org/jira/browse/KAFKA-9219
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Mickael Maison
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.4.0, 2.5.0
>
>
> The following stack trace appears:
>  
> {code:java}
> [2019-11-05 23:56:57,909] WARN Error getting JMX attribute 'assigned-tasks' 
> (org.apache.kafka.common.metrics.JmxReporter:202)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator$WorkerCoordinatorMetrics$2.measure(WorkerCoordinator.java:316)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:66)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:190)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:200)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1449)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.getAttributes(RMIConnectionImpl.java:675)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
>   at sun.rmi.transport.Transport$1.run(Transport.java:200)
>   at sun.rmi.transport.Transport$1.run(Transport.java:197)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
>   at 
> sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
>   at 
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:835)
>   at 
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at 
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-1, 
> groupId=backup-mm2] Herder stopped 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629)
> [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-2, groupId=cv-mm2] 
> Herder stopping 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:609)
> [2019-11-05 23:57:07,822] INFO [Worker clientId=connect-2, groupId=cv-mm2] 
> Herder stopped 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629)
> [2019-11-05 23:57:07,822] INFO Kafka MirrorMaker stopped. 
> (org.apache.kafka.connect.mirror.MirrorMaker:191)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2020-01-13 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-9076:
-

Assignee: Ning Zhang

> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.5.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.
>  KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> [https://github.com/apache/kafka/pull/7577]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-13 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-9360:
-

Assignee: Ning Zhang

> emitting checkpoint and heartbeat set to false will not disable the activity 
> in their SourceTask
> 
>
> Key: KAFKA-9360
> URL: https://issues.apache.org/jira/browse/KAFKA-9360
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot 
> 2020-01-02 at 3.18.23 PM.png
>
>
> `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be 
> the knobs to control if the heartbeat message or checkpoint message will be 
> sent or not to the topics respectively. In our experiments, setting them to 
> false will not suspend the activity in their SourceTasks, e.g. 
> MirrorHeartbeatTask, MirrorCheckpointTask.
> The observations are, when setting those knobs to false, huge volume of 
> `SourceRecord` are being sent without interval, causing significantly high 
> CPU usage of MirrorMaker 2 instance, GC time and congesting the single 
> partition of the heartbeat topic and checkpoint topic.
> The proposed fix in the following PR is to (1) explicitly check if `interval` 
> is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
> `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to 
> negative, put the thread in sleep mode for a while (e.g. 5 seconds) and 
> return null, in orderto prevent it from (1) hogging the cpu, (2) sending 
> heartbeat or checkpoint messages to Kafka topics.
> PR link: https://github.com/apache/kafka/pull/7887



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9352) unbalanced assignment of topic-partition to tasks

2020-01-03 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9352:
--
Issue Type: New Feature  (was: Improvement)

> unbalanced assignment of topic-partition to tasks
> -
>
> Key: KAFKA-9352
> URL: https://issues.apache.org/jira/browse/KAFKA-9352
> Project: Kafka
>  Issue Type: New Feature
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot 
> 2019-12-19 at 8.22.17 AM.png
>
>
> originally, when mirrormaker replicates a group of topics, the assignment 
> between topic-partition and tasks are pretty static. E.g. partitions from the 
> same topic tend to be grouped together as much as possible on the same task. 
> For example, 3 tasks to mirror 3 topics with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> The original assignment will look like:
> t1 -> [t0p0, t0p1, t0p2, t0p3]
> t2 -> [t0p4, t0p5, t0p6, t0p7]
> t3 -> [t1p0, t1p2, t2p0, t2p1]
> The potential issue of above assignment is: if topic 0 has more traffic than 
> other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than 
> t3. When the tasks are mapped to the mirrormaker instances (workers) and 
> launched, it will create unbalanced load on the workers. Please see the 
> picture below as an unbalanced example of 2 mirrormaker instances:
> !Screen Shot 2019-12-19 at 12.16.02 PM.png!
> Given each mirrored topic has different traffic and number of partitions, to 
> balance the load
> across all mirrormaker instances (workers), 'roundrobin' helps to evenly 
> assign all
> topic-partition to the tasks, then the tasks are further distributed to 
> workers by calling
> 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics 
> with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> t1 -> [t0p0, t0p3, t0p6, t1p1]
> t2 -> [t0p1, t0p4, t0p7, t2p0]
> t3 -> [t0p2, t0p5, t1p0, t2p1]
> The improvement of this new above assignment over the original assignment is: 
> the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, 
> which creates a relatively even load on all workers, after the tasks are 
> mapped to the workers and launched.
> Please see the picture below as a balanced example of 4 mirrormaker instances:
> !Screen Shot 2019-12-19 at 8.22.17 AM.png!
> PR link is: https://github.com/apache/kafka/pull/7880
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-02 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007130#comment-17007130
 ] 

Ning Zhang commented on KAFKA-9360:
---

pr link: https://github.com/apache/kafka/pull/7887


> emitting checkpoint and heartbeat set to false will not disable the activity 
> in their SourceTask
> 
>
> Key: KAFKA-9360
> URL: https://issues.apache.org/jira/browse/KAFKA-9360
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot 
> 2020-01-02 at 3.18.23 PM.png
>
>
> `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be 
> the knobs to control if the heartbeat message or checkpoint message will be 
> sent or not to the topics respectively. In our experiments, setting them to 
> false will not suspend the activity in their SourceTasks, e.g. 
> MirrorHeartbeatTask, MirrorCheckpointTask.
> The observations are, when setting those knobs to false, huge volume of 
> `SourceRecord` are being sent without interval, causing significantly high 
> CPU usage of MirrorMaker 2 instance, GC time and congesting the single 
> partition of the heartbeat topic and checkpoint topic.
> The proposed fix in the following PR is to (1) explicitly check if `interval` 
> is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
> `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to 
> negative, put the thread in sleep mode for a while (e.g. 5 seconds) and 
> return null, in orderto prevent it from (1) hogging the cpu, (2) sending 
> heartbeat or checkpoint messages to Kafka topics.
> PR link: https://github.com/apache/kafka/pull/7887



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-02 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9360:
--
Description: 
`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance, GC time and congesting the single partition of 
the heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in 
orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or 
checkpoint messages to Kafka topics.

PR link: https://github.com/apache/kafka/pull/7887



  was:
`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance, GC time and congesting the single partition of 
the heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in 
orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or 
checkpoint messages to Kafka topics




> emitting checkpoint and heartbeat set to false will not disable the activity 
> in their SourceTask
> 
>
> Key: KAFKA-9360
> URL: https://issues.apache.org/jira/browse/KAFKA-9360
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot 
> 2020-01-02 at 3.18.23 PM.png
>
>
> `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be 
> the knobs to control if the heartbeat message or checkpoint message will be 
> sent or not to the topics respectively. In our experiments, setting them to 
> false will not suspend the activity in their SourceTasks, e.g. 
> MirrorHeartbeatTask, MirrorCheckpointTask.
> The observations are, when setting those knobs to false, huge volume of 
> `SourceRecord` are being sent without interval, causing significantly high 
> CPU usage of MirrorMaker 2 instance, GC time and congesting the single 
> partition of the heartbeat topic and checkpoint topic.
> The proposed fix in the following PR is to (1) explicitly check if `interval` 
> is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
> `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to 
> negative, put the thread in sleep mode for a while (e.g. 5 seconds) and 
> return null, in orderto prevent it from (1) hogging the cpu, (2) sending 
> heartbeat or checkpoint messages to Kafka topics.
> PR link: https://github.com/apache/kafka/pull/7887



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-02 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9360:
--
Description: 
`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance, GC time and congesting the single partition of 
the heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in 
orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or 
checkpoint messages to Kafka topics



  was:
`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance and congesting the single partition of the 
heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in 
orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or 
checkpoint messages to Kafka topics




> emitting checkpoint and heartbeat set to false will not disable the activity 
> in their SourceTask
> 
>
> Key: KAFKA-9360
> URL: https://issues.apache.org/jira/browse/KAFKA-9360
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot 
> 2020-01-02 at 3.18.23 PM.png
>
>
> `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be 
> the knobs to control if the heartbeat message or checkpoint message will be 
> sent or not to the topics respectively. In our experiments, setting them to 
> false will not suspend the activity in their SourceTasks, e.g. 
> MirrorHeartbeatTask, MirrorCheckpointTask.
> The observations are, when setting those knobs to false, huge volume of 
> `SourceRecord` are being sent without interval, causing significantly high 
> CPU usage of MirrorMaker 2 instance, GC time and congesting the single 
> partition of the heartbeat topic and checkpoint topic.
> The proposed fix in the following PR is to (1) explicitly check if `interval` 
> is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
> `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to 
> negative, put the thread in sleep mode for a while (e.g. 5 seconds) and 
> return null, in orderto prevent it from (1) hogging the cpu, (2) sending 
> heartbeat or checkpoint messages to Kafka topics



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-02 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9360:
--
Attachment: Screen Shot 2020-01-02 at 3.18.23 PM.png

> emitting checkpoint and heartbeat set to false will not disable the activity 
> in their SourceTask
> 
>
> Key: KAFKA-9360
> URL: https://issues.apache.org/jira/browse/KAFKA-9360
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot 
> 2020-01-02 at 3.18.23 PM.png
>
>
> `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be 
> the knobs to control if the heartbeat message or checkpoint message will be 
> sent or not to the topics respectively. In our experiments, setting them to 
> false will not suspend the activity in their SourceTasks, e.g. 
> MirrorHeartbeatTask, MirrorCheckpointTask.
> The observations are, when setting those knobs to false, huge volume of 
> `SourceRecord` are being sent without interval, causing significantly high 
> CPU usage of MirrorMaker 2 instance, GC time and congesting the single 
> partition of the heartbeat topic and checkpoint topic.
> The proposed fix in the following PR is to (1) explicitly check if `interval` 
> is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
> `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to 
> negative, put the thread in sleep mode for a while (e.g. 5 seconds) and 
> return null, in orderto prevent it from (1) hogging the cpu, (2) sending 
> heartbeat or checkpoint messages to Kafka topics



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-02 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9360:
--
Description: 
`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance and congesting the single partition of the 
heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in 
orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or 
checkpoint messages to Kafka topics



  was:
`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance and congesting the single partition of the 
heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode (e.g. 5 seconds) and return null, to prevent it 
from running the remaining logic.




> emitting checkpoint and heartbeat set to false will not disable the activity 
> in their SourceTask
> 
>
> Key: KAFKA-9360
> URL: https://issues.apache.org/jira/browse/KAFKA-9360
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png
>
>
> `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be 
> the knobs to control if the heartbeat message or checkpoint message will be 
> sent or not to the topics respectively. In our experiments, setting them to 
> false will not suspend the activity in their SourceTasks, e.g. 
> MirrorHeartbeatTask, MirrorCheckpointTask.
> The observations are, when setting those knobs to false, huge volume of 
> `SourceRecord` are being sent without interval, causing significantly high 
> CPU usage of MirrorMaker 2 instance and congesting the single partition of 
> the heartbeat topic and checkpoint topic.
> The proposed fix in the following PR is to (1) explicitly check if `interval` 
> is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
> `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to 
> negative, put the thread in sleep mode for a while (e.g. 5 seconds) and 
> return null, in orderto prevent it from (1) hogging the cpu, (2) sending 
> heartbeat or checkpoint messages to Kafka topics



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask

2020-01-02 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-9360:
-

 Summary: emitting checkpoint and heartbeat set to false will not 
disable the activity in their SourceTask
 Key: KAFKA-9360
 URL: https://issues.apache.org/jira/browse/KAFKA-9360
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.4.0
Reporter: Ning Zhang
 Fix For: 2.5.0
 Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png

`emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the 
knobs to control if the heartbeat message or checkpoint message will be sent or 
not to the topics respectively. In our experiments, setting them to false will 
not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, 
MirrorCheckpointTask.

The observations are, when setting those knobs to false, huge volume of 
`SourceRecord` are being sent without interval, causing significantly high CPU 
usage of MirrorMaker 2 instance and congesting the single partition of the 
heartbeat topic and checkpoint topic.



The proposed fix in the following PR is to (1) explicitly check if `interval` 
is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or 
`emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, 
put the thread in sleep mode (e.g. 5 seconds) and return null, to prevent it 
from running the remaining logic.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9352) unbalanced assignment of topic-partition to tasks

2019-12-31 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006244#comment-17006244
 ] 

Ning Zhang commented on KAFKA-9352:
---

pr: https://github.com/apache/kafka/pull/7880

> unbalanced assignment of topic-partition to tasks
> -
>
> Key: KAFKA-9352
> URL: https://issues.apache.org/jira/browse/KAFKA-9352
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot 
> 2019-12-19 at 8.22.17 AM.png
>
>
> originally, when mirrormaker replicates a group of topics, the assignment 
> between topic-partition and tasks are pretty static. E.g. partitions from the 
> same topic tend to be grouped together as much as possible on the same task. 
> For example, 3 tasks to mirror 3 topics with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> The original assignment will look like:
> t1 -> [t0p0, t0p1, t0p2, t0p3]
> t2 -> [t0p4, t0p5, t0p6, t0p7]
> t3 -> [t1p0, t1p2, t2p0, t2p1]
> The potential issue of above assignment is: if topic 0 has more traffic than 
> other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than 
> t3. When the tasks are mapped to the mirrormaker instances (workers) and 
> launched, it will create unbalanced load on the workers. Please see the 
> picture below as an unbalanced example of 2 mirrormaker instances:
> !Screen Shot 2019-12-19 at 12.16.02 PM.png!
> Given each mirrored topic has different traffic and number of partitions, to 
> balance the load
> across all mirrormaker instances (workers), 'roundrobin' helps to evenly 
> assign all
> topic-partition to the tasks, then the tasks are further distributed to 
> workers by calling
> 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics 
> with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> t1 -> [t0p0, t0p3, t0p6, t1p1]
> t2 -> [t0p1, t0p4, t0p7, t2p0]
> t3 -> [t0p2, t0p5, t1p0, t2p1]
> The improvement of this new above assignment over the original assignment is: 
> the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, 
> which creates a relatively even load on all workers, after the tasks are 
> mapped to the workers and launched.
> Please see the picture below as a balanced example of 4 mirrormaker instances:
> !Screen Shot 2019-12-19 at 8.22.17 AM.png!
> PR link is: https://github.com/apache/kafka/pull/7880
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9352) unbalanced assignment of topic-partition to tasks

2019-12-31 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9352:
--
Description: 
originally, when mirrormaker replicates a group of topics, the assignment 
between topic-partition and tasks are pretty static. E.g. partitions from the 
same topic tend to be grouped together as much as possible on the same task. 
For example, 3 tasks to mirror 3 topics with 8, 2 and 2
partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
partition 5'

The original assignment will look like:

t1 -> [t0p0, t0p1, t0p2, t0p3]
t2 -> [t0p4, t0p5, t0p6, t0p7]
t3 -> [t1p0, t1p2, t2p0, t2p1]

The potential issue of above assignment is: if topic 0 has more traffic than 
other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than t3. 
When the tasks are mapped to the mirrormaker instances (workers) and launched, 
it will create unbalanced load on the workers. Please see the picture below as 
an unbalanced example of 2 mirrormaker instances:

!Screen Shot 2019-12-19 at 12.16.02 PM.png!

Given each mirrored topic has different traffic and number of partitions, to 
balance the load
across all mirrormaker instances (workers), 'roundrobin' helps to evenly assign 
all
topic-partition to the tasks, then the tasks are further distributed to workers 
by calling
'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics 
with 8, 2 and 2
partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
partition 5'
t1 -> [t0p0, t0p3, t0p6, t1p1]
t2 -> [t0p1, t0p4, t0p7, t2p0]
t3 -> [t0p2, t0p5, t1p0, t2p1]

The improvement of this new above assignment over the original assignment is: 
the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, 
which creates a relatively even load on all workers, after the tasks are mapped 
to the workers and launched.
Please see the picture below as a balanced example of 4 mirrormaker instances:

!Screen Shot 2019-12-19 at 8.22.17 AM.png!

PR link is: https://github.com/apache/kafka/pull/7880

 

  was:
originally, when mirrormaker replicates a group of topics, the assignment 
between topic-partition and tasks are pretty static. E.g. partitions from the 
same topic tend to be grouped together as much as possible on the same task. 
For example, 3 tasks to mirror 3 topics with 8, 2 and 2
partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
partition 5'

The original assignment will look like:

t1 -> [t0p0, t0p1, t0p2, t0p3]
t2 -> [t0p4, t0p5, t0p6, t0p7]
t3 -> [t1p0, t1p2, t2p0, t2p1]

The potential issue of above assignment is: if topic 0 has more traffic than 
other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than t3. 
When the tasks are mapped to the mirrormaker instances (workers) and launched, 
it will create unbalanced load on the workers. Please see the picture below as 
an unbalanced example of 2 mirrormaker instances:

!Screen Shot 2019-12-19 at 12.16.02 PM.png!

Given each mirrored topic has different traffic and number of partitions, to 
balance the load
across all mirrormaker instances (workers), 'roundrobin' helps to evenly assign 
all
topic-partition to the tasks, then the tasks are further distributed to workers 
by calling
'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics 
with 8, 2 and 2
partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
partition 5'
t1 -> [t0p0, t0p3, t0p6, t1p1]
t2 -> [t0p1, t0p4, t0p7, t2p0]
t3 -> [t0p2, t0p5, t1p0, t2p1]

The improvement of this new above assignment over the original assignment is: 
the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, 
which creates a relatively even load on all workers, after the tasks are mapped 
to the workers and launched.
Please see the picture below as a balanced example of 4 mirrormaker instances:

!Screen Shot 2019-12-19 at 8.22.17 AM.png!

 


> unbalanced assignment of topic-partition to tasks
> -
>
> Key: KAFKA-9352
> URL: https://issues.apache.org/jira/browse/KAFKA-9352
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot 
> 2019-12-19 at 8.22.17 AM.png
>
>
> originally, when mirrormaker replicates a group of topics, the assignment 
> between topic-partition and tasks are pretty static. E.g. partitions from the 
> same topic tend to be grouped together as much as possible on the same task. 
> For example, 3 tasks to mirror 3 topics with 8, 2 and 2
> partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
> partition 5'
> The original assignment will look like:
> t1 -> [t0p0, 

[jira] [Created] (KAFKA-9352) unbalanced assignment of topic-partition to tasks

2019-12-31 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-9352:
-

 Summary: unbalanced assignment of topic-partition to tasks
 Key: KAFKA-9352
 URL: https://issues.apache.org/jira/browse/KAFKA-9352
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.4.0
Reporter: Ning Zhang
 Fix For: 2.5.0
 Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot 
2019-12-19 at 8.22.17 AM.png

originally, when mirrormaker replicates a group of topics, the assignment 
between topic-partition and tasks are pretty static. E.g. partitions from the 
same topic tend to be grouped together as much as possible on the same task. 
For example, 3 tasks to mirror 3 topics with 8, 2 and 2
partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
partition 5'

The original assignment will look like:

t1 -> [t0p0, t0p1, t0p2, t0p3]
t2 -> [t0p4, t0p5, t0p6, t0p7]
t3 -> [t1p0, t1p2, t2p0, t2p1]

The potential issue of above assignment is: if topic 0 has more traffic than 
other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than t3. 
When the tasks are mapped to the mirrormaker instances (workers) and launched, 
it will create unbalanced load on the workers. Please see the picture below as 
an unbalanced example of 2 mirrormaker instances:

!Screen Shot 2019-12-19 at 12.16.02 PM.png!

Given each mirrored topic has different traffic and number of partitions, to 
balance the load
across all mirrormaker instances (workers), 'roundrobin' helps to evenly assign 
all
topic-partition to the tasks, then the tasks are further distributed to workers 
by calling
'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics 
with 8, 2 and 2
partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, 
partition 5'
t1 -> [t0p0, t0p3, t0p6, t1p1]
t2 -> [t0p1, t0p4, t0p7, t2p0]
t3 -> [t0p2, t0p5, t1p0, t2p1]

The improvement of this new above assignment over the original assignment is: 
the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, 
which creates a relatively even load on all workers, after the tasks are mapped 
to the workers and launched.
Please see the picture below as a balanced example of 4 mirrormaker instances:

!Screen Shot 2019-12-19 at 8.22.17 AM.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-23 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958430#comment-16958430
 ] 

Ning Zhang commented on KAFKA-9076:
---

[~ryannedolan] KIP is added: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.5.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.
>  
> https://github.com/apache/kafka/pull/7577



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-23 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9076:
--
Description: 
To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer and stream applications migrate from source to 
target cluster transparently and conveniently, e.g. in event of source cluster 
failure, a background job is proposed to periodically sync the consumer offsets 
from the source to target cluster, so that when the consumer and stream 
applications switch to the target cluster, it will resume to consume from where 
it left off at source cluster.

 KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

[https://github.com/apache/kafka/pull/7577]

  was:
To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer and stream applications migrate from source to 
target cluster transparently and conveniently, e.g. in event of source cluster 
failure, a background job is proposed to periodically sync the consumer offsets 
from the source to target cluster, so that when the consumer and stream 
applications switch to the target cluster, it will resume to consume from where 
it left off at source cluster.

 

https://github.com/apache/kafka/pull/7577


> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.5.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.
>  KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> [https://github.com/apache/kafka/pull/7577]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-22 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9076:
--
Description: 
To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer and stream applications migrate from source to 
target cluster transparently and conveniently, e.g. in event of source cluster 
failure, a background job is proposed to periodically sync the consumer offsets 
from the source to target cluster, so that when the consumer and stream 
applications switch to the target cluster, it will resume to consume from where 
it left off at source cluster.

 

https://github.com/apache/kafka/pull/7577

  was:
To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer and stream applications migrate from source to 
target cluster transparently and conveniently, e.g. in event of source cluster 
failure, a background job is proposed to periodically sync the consumer offsets 
from the source to target cluster, so that when the consumer and stream 
applications switch to the target cluster, it will resume to consume from where 
it left off at source cluster.


> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.5.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.
>  
> https://github.com/apache/kafka/pull/7577



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-22 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-9076:
--
Description: 
To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer and stream applications migrate from source to 
target cluster transparently and conveniently, e.g. in event of source cluster 
failure, a background job is proposed to periodically sync the consumer offsets 
from the source to target cluster, so that when the consumer and stream 
applications switch to the target cluster, it will resume to consume from where 
it left off at source cluster.

  was:
To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer migration from source to target cluster 
transparent and convenient, e.g. in event of source cluster failure, it is 
better to have a background job to continuously and periodically sync the 
consumer offsets from the source to target cluster, so that when the consumer 
switches to the target cluster, it will resume to consume from where it left 
off at source cluster.


> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.5.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer and stream applications migrate from source to 
> target cluster transparently and conveniently, e.g. in event of source 
> cluster failure, a background job is proposed to periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> and stream applications switch to the target cluster, it will resume to 
> consume from where it left off at source cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-22 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16957169#comment-16957169
 ] 

Ning Zhang edited comment on KAFKA-9076 at 10/22/19 3:28 PM:
-

 

[~ryannedolan] Thanks for your comments. Here is my response: 

{quote}

in order to write offsets for a consumer group, we need to know that the group 
is not already running on the target cluster. Otherwise we'd be stepping on 
that group's current offsets. The group coordinator won't allow this afaik.

{quote}

 

my PR will could check if the group is active or not in the target cluster. If 
active, no need to sync the offset

{quote}

we could kick out a target group and force it to seek to the new offsets by 
revoking group membership and forcing a rebalance etc. But we wouldn't want to 
do this periodically.

{quote}

For Kafka Stream, I am not sure if there is a way to force the stream 
application to seek from a particular offset, like `consumer.seek()`.

{quote}

we could write offsets to a new group ID, eg. us-west group1, just like we do 
with topics, s.t. we avoid the above issues. Then migrating groups would 
involve changing the group ID. That works fine, but consumers would need a way 
to determine which group ID to use. Translating group ID like that is more 
cumbersome than translating offsets, since offsets can be altered using 
existing tools, but there is no way to tell a consumer to change its group ID.

{quote}

When failover or migrate from one to another cluster, especially doing it 
manually, if this requires to change the broker URL of consumer / stream 
application to point to the backup cluster, it may not be cumbersome to change 
the consumer group ID as well.

{quote}

I think there are scenarios where automatically writing offsets as you propose 
might make sense, e.g. in an active/standby scenario where consumers only 
connect to one cluster at a time. But if you are automating that behavior, you 
might as well automate the offset translation via RemoteClusterUtils, IMO.

{quote}

offset translation is the first step and can be certainly done by 
RemoteClusterUtils. The second step is to write the offsets to the target 
cluster. IMO, RemoteClusterUtils may not do the second step.

{quote}

My team has built external tooling using RemoteClusterUtils that works with 
existing consumers. It's possible to fully automate failover and failback this 
way. I'm skeptical that automatically writing offsets as you propose would make 
this process simpler.

{quote}

I have a PR for review and use it internally in our system, which works well 
for active/standup and migrating stream applications from one cluster to 
another cluster transparently. Definitely more considerations on this approach 
could be needed in the future.

The PR is small and by default I propose to disable this auto sync feature for 
now. Do we want to see the PR and exchange more thoughts?

Thanks

 

 


was (Author: yangguo1220):
 

[~ryannedolan] Thanks for your comments. Here is my response:

 

{{{quote}}}

in order to write offsets for a consumer group, we need to know that the group 
is not already running on the target cluster. Otherwise we'd be stepping on 
that group's current offsets. The group coordinator won't allow this afaik.

{{{quote}}}

my PR will could check if the group is active or not in the target cluster. If 
active, no need to sync the offset

> we could kick out a target group and force it to seek to the new offsets by 
> revoking group membership and forcing a rebalance etc. But we wouldn't want > 
> to do this periodically.

For Kafka Stream, I am not sure if there is a way to force the stream 
application to seek from a particular offset, like `consumer.seek()`.

> we could write offsets to a new group ID, eg. us-west group1, just like we do 
> with topics, s.t. we avoid the above issues. Then migrating groups would  > 
> involve changing the group ID. That works fine, but consumers would need a 
> way to determine which group ID to use. Translating group ID like that is 
> more > cumbersome than translating offsets, since offsets can be altered 
> using existing tools, but there is no way to tell a consumer to change its 
> group ID.

When failover or migrate from one to another cluster, especially doing it 
manually, if this requires to change the broker URL of consumer / stream 
application to point to the backup cluster, it may not be cumbersome to change 
the consumer group ID as well.

>I think there are scenarios where automatically writing offsets as you propose 
>might make sense, e.g. in an active/standby scenario where consumers only > 
>connect to one cluster at a time. But if you are automating that behavior, you 
>might as well automate the offset translation via RemoteClusterUtils, IMO.

offset translation is the first step and can be certainly done by 
RemoteClusterUtils. The second step 

[jira] [Comment Edited] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-22 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16957169#comment-16957169
 ] 

Ning Zhang edited comment on KAFKA-9076 at 10/22/19 3:27 PM:
-

 

[~ryannedolan] Thanks for your comments. Here is my response:

 

{{{quote}}}

in order to write offsets for a consumer group, we need to know that the group 
is not already running on the target cluster. Otherwise we'd be stepping on 
that group's current offsets. The group coordinator won't allow this afaik.

{{{quote}}}

my PR will could check if the group is active or not in the target cluster. If 
active, no need to sync the offset

> we could kick out a target group and force it to seek to the new offsets by 
> revoking group membership and forcing a rebalance etc. But we wouldn't want > 
> to do this periodically.

For Kafka Stream, I am not sure if there is a way to force the stream 
application to seek from a particular offset, like `consumer.seek()`.

> we could write offsets to a new group ID, eg. us-west group1, just like we do 
> with topics, s.t. we avoid the above issues. Then migrating groups would  > 
> involve changing the group ID. That works fine, but consumers would need a 
> way to determine which group ID to use. Translating group ID like that is 
> more > cumbersome than translating offsets, since offsets can be altered 
> using existing tools, but there is no way to tell a consumer to change its 
> group ID.

When failover or migrate from one to another cluster, especially doing it 
manually, if this requires to change the broker URL of consumer / stream 
application to point to the backup cluster, it may not be cumbersome to change 
the consumer group ID as well.

>I think there are scenarios where automatically writing offsets as you propose 
>might make sense, e.g. in an active/standby scenario where consumers only > 
>connect to one cluster at a time. But if you are automating that behavior, you 
>might as well automate the offset translation via RemoteClusterUtils, IMO.

offset translation is the first step and can be certainly done by 
RemoteClusterUtils. The second step is to write the offsets to the target 
cluster. IMO, RemoteClusterUtils may not do the second step.

>My team has built external tooling using RemoteClusterUtils that works with 
>existing consumers. It's possible to fully automate failover and failback this 
>> way. I'm skeptical that automatically writing offsets as you propose would 
>make this process simpler.

I have a PR for review and use it internally in our system, which works well 
for active/standup and migrating stream applications from one cluster to 
another cluster transparently. Definitely more considerations on this approach 
could be needed in the future.

The PR is small and by default I propose to disable this auto sync feature for 
now. Do we want to see the PR and exchange more thoughts?

Thanks

 

 


was (Author: yangguo1220):
[~ryannedolan] Thanks for your comments. Here is my response:

> in order to write offsets for a consumer group, we need to know that the 
> group is not already running on the target cluster. Otherwise we'd be 
> stepping on > that group's current offsets. The group coordinator won't allow 
> this afaik.

my PR will could check if the group is active or not in the target cluster. If 
active, no need to sync the offset

> we could kick out a target group and force it to seek to the new offsets by 
> revoking group membership and forcing a rebalance etc. But we wouldn't want > 
> to do this periodically.

For Kafka Stream, I am not sure if there is a way to force the stream 
application to seek from a particular offset, like `consumer.seek()`.

> we could write offsets to a new group ID, eg. us-west group1, just like we do 
> with topics, s.t. we avoid the above issues. Then migrating groups would  > 
> involve changing the group ID. That works fine, but consumers would need a 
> way to determine which group ID to use. Translating group ID like that is 
> more > cumbersome than translating offsets, since offsets can be altered 
> using existing tools, but there is no way to tell a consumer to change its 
> group ID.

When failover or migrate from one to another cluster, especially doing it 
manually, if this requires to change the broker URL of consumer / stream 
application to point to the backup cluster, it may not be cumbersome to change 
the consumer group ID as well.

>I think there are scenarios where automatically writing offsets as you propose 
>might make sense, e.g. in an active/standby scenario where consumers only > 
>connect to one cluster at a time. But if you are automating that behavior, you 
>might as well automate the offset translation via RemoteClusterUtils, IMO.

offset translation is the first step and can be certainly done by 
RemoteClusterUtils. The second step is to write the offsets to the target 
cluster. 

[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-22 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16957169#comment-16957169
 ] 

Ning Zhang commented on KAFKA-9076:
---

[~ryannedolan] Thanks for your comments. Here is my response:

> in order to write offsets for a consumer group, we need to know that the 
> group is not already running on the target cluster. Otherwise we'd be 
> stepping on > that group's current offsets. The group coordinator won't allow 
> this afaik.

my PR will could check if the group is active or not in the target cluster. If 
active, no need to sync the offset

> we could kick out a target group and force it to seek to the new offsets by 
> revoking group membership and forcing a rebalance etc. But we wouldn't want > 
> to do this periodically.

For Kafka Stream, I am not sure if there is a way to force the stream 
application to seek from a particular offset, like `consumer.seek()`.

> we could write offsets to a new group ID, eg. us-west group1, just like we do 
> with topics, s.t. we avoid the above issues. Then migrating groups would  > 
> involve changing the group ID. That works fine, but consumers would need a 
> way to determine which group ID to use. Translating group ID like that is 
> more > cumbersome than translating offsets, since offsets can be altered 
> using existing tools, but there is no way to tell a consumer to change its 
> group ID.

When failover or migrate from one to another cluster, especially doing it 
manually, if this requires to change the broker URL of consumer / stream 
application to point to the backup cluster, it may not be cumbersome to change 
the consumer group ID as well.

>I think there are scenarios where automatically writing offsets as you propose 
>might make sense, e.g. in an active/standby scenario where consumers only > 
>connect to one cluster at a time. But if you are automating that behavior, you 
>might as well automate the offset translation via RemoteClusterUtils, IMO.

offset translation is the first step and can be certainly done by 
RemoteClusterUtils. The second step is to write the offsets to the target 
cluster. IMO, RemoteClusterUtils may not do the second step.

>My team has built external tooling using RemoteClusterUtils that works with 
>existing consumers. It's possible to fully automate failover and failback this 
>> way. I'm skeptical that automatically writing offsets as you propose would 
>make this process simpler.

I have a PR for review and use it internally in our system, which works well 
for active/standup and migrating stream applications from one cluster to 
another cluster transparently. Definitely more considerations on this approach 
could be needed in the future.

The PR is small and by default I propose to disable this auto sync feature for 
now. Do we want to see the PR and exchange more thoughts?

Thanks

 

 

> MirrorMaker 2.0 automated consumer offset sync
> --
>
> Key: KAFKA-9076
> URL: https://issues.apache.org/jira/browse/KAFKA-9076
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ning Zhang
>Priority: Major
>  Labels: mirrormaker, pull-request-available
> Fix For: 2.5.0
>
>
> To calculate the translated consumer offset in the target cluster, currently 
> `Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
> used by "RemoteClusterUtils" for one-time purpose.
> In order to make the consumer migration from source to target cluster 
> transparent and convenient, e.g. in event of source cluster failure, it is 
> better to have a background job to continuously and periodically sync the 
> consumer offsets from the source to target cluster, so that when the consumer 
> switches to the target cluster, it will resume to consume from where it left 
> off at source cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync

2019-10-22 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-9076:
-

 Summary: MirrorMaker 2.0 automated consumer offset sync
 Key: KAFKA-9076
 URL: https://issues.apache.org/jira/browse/KAFKA-9076
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.4.0
Reporter: Ning Zhang
 Fix For: 2.5.0


To calculate the translated consumer offset in the target cluster, currently 
`Mirror-client` provides a function called "remoteConsumerOffsets()" that is 
used by "RemoteClusterUtils" for one-time purpose.

In order to make the consumer migration from source to target cluster 
transparent and convenient, e.g. in event of source cluster failure, it is 
better to have a background job to continuously and periodically sync the 
consumer offsets from the source to target cluster, so that when the consumer 
switches to the target cluster, it will resume to consume from where it left 
off at source cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)