[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343028#comment-17343028 ] Ning Zhang commented on KAFKA-12635: backport is a great point. I guess the review cycle will probably take 3-4 weeks at most, so I will let the committer/reviewer know the backport option and see what is available from their point of view. > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342308#comment-17342308 ] Ning Zhang commented on KAFKA-12635: great, thanks for the feedback. I will proceed to finalize the pull request. > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341105#comment-17341105 ] Ning Zhang commented on KAFKA-12635: updated and tested the PR > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340917#comment-17340917 ] Ning Zhang commented on KAFKA-12635: [~dragotic] I see, I probably missed your comment "but before Step 5. We create the consumer groups on the target cluster using the kafka-console-consumer.sh command with two extra flags: --max-messages 1 --timeout-ms 6000" I am updating the PR and testing it again > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340533#comment-17340533 ] Ning Zhang commented on KAFKA-12635: hi [~fyi] [~dragotic] I compiled the latest trunk Kafka version with the above pull request at [https://github.com/ning2008wisc/kafka-trunk-binary/blob/master/kafka_2.13-3.0.0-SNAPSHOT.tgz] It looks solve the bug from my local testing and would like to hear your initial feedback. If good, I will polish and have the pull request ready for review. Thanks for your patience > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340532#comment-17340532 ] Ning Zhang commented on KAFKA-12635: pull request: https://github.com/apache/kafka/pull/10644 > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337896#comment-17337896 ] Ning Zhang commented on KAFKA-12635: > This state can be reached if the source consumer group consumed some records >that were now deleted (like by a retention policy), or if Mirrormaker >replication is set to start at "latest". [~fyi] I am reading your above statement several times, and could not figure out the reproduce scenario. Do you mind to share the scenario step-by-step from a good state to a bad state? Thanks > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337894#comment-17337894 ] Ning Zhang commented on KAFKA-12635: hi [~dragotic] could you please elaborate on how you can re-produce the issue step-by-step? > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321170#comment-17321170 ] Ning Zhang commented on KAFKA-12635: [~akaltsikis] probably try to manually create a consumer group with initial offset = 0. The console consumer command will do that > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-12635: -- Assignee: Ning Zhang > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10750) test failure scenarios of MirrorMaker 2
Ning Zhang created KAFKA-10750: -- Summary: test failure scenarios of MirrorMaker 2 Key: KAFKA-10750 URL: https://issues.apache.org/jira/browse/KAFKA-10750 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.7.0 Reporter: Ning Zhang Assignee: Ning Zhang As a follow up of https://issues.apache.org/jira/browse/KAFKA-10304, it may be necessary to test the failure scenarios, e.g. Kafka broker stop then start To make PR [https://github.com/apache/kafka/pull/9224] smaller, we chopped down the testing code for failure scenarios, and plan to add them back in this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md
[ https://issues.apache.org/jira/browse/KAFKA-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10737: --- Description: As MM2 is adopted by more users, new and advanced use cases requires Kafka clusters to be secure, and one way to make this happen is SSL-enabled cluster. Currently there is no clear doc on how to configure MM2 so that it can consume from or produce to a SSL-enable cluster. Some users spent quite amount of time and found out the right config (see https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to clearly doc on this. was: As MM2 is adopted by more users, new and advanced use cases requires Kafka clusters to be secure, and one way to make this happen is SSL-enabled cluster. Currently there is no clear doc on how to configure MM2 so that it can consume from or produce to a SSL-enable cluster. Some users spent quite amount of time and found out the right config (see ). So it would be great to clearly doc on this. > MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster > in README.md > -- > > Key: KAFKA-10737 > URL: https://issues.apache.org/jira/browse/KAFKA-10737 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > > As MM2 is adopted by more users, new and advanced use cases requires Kafka > clusters to be secure, and one way to make this happen is SSL-enabled cluster. > Currently there is no clear doc on how to configure MM2 so that it can > consume from or produce to a SSL-enable cluster. Some users spent quite > amount of time and found out the right config (see > https://issues.apache.org/jira/browse/KAFKA-10704). So it would be great to > clearly doc on this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10737) MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md
Ning Zhang created KAFKA-10737: -- Summary: MirrorMaker 2: clarify how to produce to or consume from SSL-enabled cluster in README.md Key: KAFKA-10737 URL: https://issues.apache.org/jira/browse/KAFKA-10737 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.7.0 Reporter: Ning Zhang Assignee: Ning Zhang As MM2 is adopted by more users, new and advanced use cases requires Kafka clusters to be secure, and one way to make this happen is SSL-enabled cluster. Currently there is no clear doc on how to configure MM2 so that it can consume from or produce to a SSL-enable cluster. Some users spent quite amount of time and found out the right config (see ). So it would be great to clearly doc on this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang resolved KAFKA-10704. Resolution: Resolved > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This message
[jira] [Reopened] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reopened KAFKA-10704: > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} -- This
[jira] [Updated] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10704: --- Fix Version/s: (was: 2.7.0) > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: listNodes > {code} --
[jira] [Commented] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17234271#comment-17234271 ] Ning Zhang commented on KAFKA-10704: Indeed, we should clearly document how to produce to SSL-enabled cluster. I will create a PR against [https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md] to add a section about SSL-enabled cluster > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes,
[jira] [Assigned] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-10704: -- Assignee: Ning Zhang > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Assignee: Ning Zhang >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=listNodes, deadlineMs=1605012024641, tries=1, > nextAllowedTryMs=1605012024742) timed out at 1605012024642 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call:
[jira] [Commented] (KAFKA-10728) Mirroring data without decompressing with MirrorMaker 2.0
[ https://issues.apache.org/jira/browse/KAFKA-10728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17234255#comment-17234255 ] Ning Zhang commented on KAFKA-10728: my first impression is: unless the producer of MM2 is explicitly set to use `uncompressed` with [https://kafka.apache.org/documentation/#compression.type] it will use the default compression value > Mirroring data without decompressing with MirrorMaker 2.0 > - > > Key: KAFKA-10728 > URL: https://issues.apache.org/jira/browse/KAFKA-10728 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Eazhilan Nagarajan >Priority: Major > > Hello, > > I use MirrorMaker 2.0 to copy data across two Kafka clusters and it's all > working fine. Recently we enabled compressing while producing data into any > topic which had a very positive impact on the storage and other resources but > while mirroring, the data seems to be decompressed at the target Kafka > cluster. I tried enabling compression using the below config in MM2, the data > at the target cluster is compressed now, the decompress and re-compress > continues to happen and it eats up a lot of resources unnecessarily. > > {noformat} > - alias: my-passive-cluster > authentication: > passwordSecret: > password: password > secretName: passive-cluster-secret > type: scram-sha-512 > username: user-1 > bootstrapServers: my-passive-cluster.com:443 > config: > config.storage.replication.factor: 3 > offset.storage.replication.factor: 3 > status.storage.replication.factor: 3 > producer.compression.type: gzip{noformat} > I found couple of Jira issues talking about it but I don't know if the > shallow iterator option is available now. > https://issues.apache.org/jira/browse/KAFKA-732, > https://issues.apache.org/jira/browse/KAFKA-845 > > Kindly let me if this is currently available or if it'll be available in the > future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration
[ https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232354#comment-17232354 ] Ning Zhang commented on KAFKA-10719: The potential config inconsistence of MM2 may be rooted at how Kafka Connect saves the config, simply because MM2 is built on top of Kafka Connect. Typically Kafka Connect put config in a special kafka topic. By reading the description section of [https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java] In rare cases, there is a chance of seeing inconsistent config across different tasks. > MirrorMaker2 fails to update its runtime configuration > -- > > Key: KAFKA-10719 > URL: https://issues.apache.org/jira/browse/KAFKA-10719 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Peter Sinoros-Szabo >Priority: Major > > I was running successfully the MM2 cluster with the following configuration, > I simplified it a little: {code:java} clusters = main, backup > main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 > backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 > main->backup.enabled = true main->backup.topics = .*{code} I wanted to change > the bootstrap.address list of the destination cluster to a different list > that is pointing to the *same* cluster, just a different listener with a > different routing. So I changed it to: {code:java} backup.bootstrap.servers = > backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart > on the MM2 nodes and say that some tasks were still using the old bootstrap > addresses, some of them were using the new one. I don't have the logs, so > unfortunately I don't know which one picked up the good values and which > didn't. I even stopped the cluster completely, but it didn't help. Ryanne > adviced to delete the mm2-config and mm2-status topics, so I did delete those > on the destination cluster, that solved this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232338#comment-17232338 ] Ning Zhang edited comment on KAFKA-10704 at 11/15/20, 4:34 PM: --- This PR [https://github.com/apache/kafka/pull/9224] actually tested out SSL case (unencrypted source cluster, but encrypted target cluster). In real world, there is use case of mirroring from unencrypted cluster to AWS hosted Kafka (always encrypted). So I believe current MirrorMaker 2 can support encryption at source or target out-of-the-box was (Author: yangguo1220): This PR [https://github.com/apache/kafka/pull/9224] actually tested out SSL case (unencrypted source cluster, but encrypted target cluster). In real world, there is use case of mirroring from unencrypted cluster to AWS hosted Kafka (always encrypted). So I believe current MirrorMaker 2 can support encryption out-of-the-box > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at
[jira] [Commented] (KAFKA-10704) Mirror maker with TLS at target
[ https://issues.apache.org/jira/browse/KAFKA-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232338#comment-17232338 ] Ning Zhang commented on KAFKA-10704: This PR [https://github.com/apache/kafka/pull/9224] actually tested out SSL case (unencrypted source cluster, but encrypted target cluster). In real world, there is use case of mirroring from unencrypted cluster to AWS hosted Kafka (always encrypted). So I believe current MirrorMaker 2 can support encryption out-of-the-box > Mirror maker with TLS at target > --- > > Key: KAFKA-10704 > URL: https://issues.apache.org/jira/browse/KAFKA-10704 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Tushar Bhasme >Priority: Critical > Fix For: 2.7.0 > > > We need to setup mirror maker from a single node kafka cluster to a three > node Strimzi cluster. There is no SSL setup at source, however the target > cluster is configured with MTLS. > With below config, commands from source like listing topics etc are working: > {code:java} > cat client-ssl.properties > security.protocol=SSL > ssl.truststore.location=my.truststore > ssl.truststore.password=123456 > ssl.keystore.location=my.keystore > ssl.keystore.password=123456 > ssl.key.password=password{code} > However, we are not able to get mirror maker working with the similar configs: > {code:java} > source.security.protocol=PLAINTEXT > target.security.protocol=SSL > target.ssl.truststore.location=my.truststore > target.ssl.truststore.password=123456 > target.ssl.keystore.location=my.keystore > target.ssl.keystore.password=123456 > target.ssl.key.password=password{code} > Errors while running mirror maker: > {code:java} > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605011994642, tries=1, nextAllowedTryMs=1605011994743) timed out > at 1605011994643 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: fetchMetadata > [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 > unregistered (org.apache.kafka.common.utils.AppInfoParser:83) > [2020-11-10 12:40:24,643] INFO [AdminClient clientId=adminclient-8] Metadata > update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) > org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, > deadlineMs=1605012024643, tries=1, nextAllowedTryMs=-9223372036854775709) > timed out at 9223372036854775807 after 1attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: fetchMetadata > [2020-11-10 12:40:24,644] INFO Metrics scheduler closed > (org.apache.kafka.common.metrics.Metrics:668) > [2020-11-10 12:40:24,644] INFO Closing reporter > org.apache.kafka.common.metrics.JmxReporter > (org.apache.kafka.common.metrics.Metrics:672) > [2020-11-10 12:40:24,644] INFO Metrics reporters closed > (org.apache.kafka.common.metrics.Metrics:678) > [2020-11-10 12:40:24,645] ERROR Stopping due to error > (org.apache.kafka.connect.mirror.MirrorMaker:304) > org.apache.kafka.connect.errors.ConnectException: Failed to connect to and > describe Kafka cluster. Check worker's broker connection and security > properties. > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) > at > org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:235) > at > org.apache.kafka.connect.mirror.MirrorMaker.lambda$new$1(MirrorMaker.java:136) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:136) > at > org.apache.kafka.connect.mirror.MirrorMaker.(MirrorMaker.java:148) > at > org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:291) > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, > deadlineMs=1605012024641, tries=1, nextAllowedTryMs=1605012024742)timed out > at 1605012024642 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) > ... 7 more > Caused by:
[jira] [Resolved] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang resolved KAFKA-10424. Resolution: Not A Bug > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199546#comment-17199546 ] Ning Zhang commented on KAFKA-10339: Thanks [~mimaison], once the above PR 8730 is merged, I will rebase from it immediately and https://issues.apache.org/jira/browse/KAFKA-10304 could be prioritized. > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199443#comment-17199443 ] Ning Zhang commented on KAFKA-10339: [~mimaison] When possible, very appreciated if KAFKA-10483 and KAFKA-10304 could be reviewed first, which are will make this task easier to test and implement. If there may be other folks who are more appropriate to review, please nominate and I will contact. Thanks > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10483) extract common functions from SourceConnector and SourceTask
Ning Zhang created KAFKA-10483: -- Summary: extract common functions from SourceConnector and SourceTask Key: KAFKA-10483 URL: https://issues.apache.org/jira/browse/KAFKA-10483 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Ning Zhang Assignee: Ning Zhang When implementing MM2 EOS feature, existing functions from `MirrorSourceConnector` and `MirrorSourceTask` can be reused for `MirrorSinkConnector` and `MirrorSinkTask`. To minimize the maintenance cost in the long-term and maximum the code reusability, it worths to extract common functions out of current `MirrorSourceConnector` and `MirrorSourceTask`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192664#comment-17192664 ] Ning Zhang edited comment on KAFKA-10304 at 9/9/20, 3:00 PM: - Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 integration tests. The purpose of doing that: (1) address the concern in [a previous PR|https://github.com/apache/kafka/pull/9029#issuecomment-663094946], (2) prepare for the future development (e.g. extract common functions). I think the current PR ([https://github.com/apache/kafka/pull/9224]) is just a starting point, and I am very appreciated for your feedback on what to test additionally and how to get close to the real scenario. was (Author: yangguo1220): Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 integration tests. The purpose of doing that: (1) address the concern in the previous PR (https://github.com/apache/kafka/pull/9029), (2) prepare for the future development (e.g. extract common functions). I think the current PR (https://github.com/apache/kafka/pull/9224) is just a starting point, and I am very appreciated for your feedback on what to test additionally and how to get close to the real scenario. > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change, [some > concerns|https://github.com/apache/kafka/pull/9029#issuecomment-663094946] on > tests were raised. It may be a good time to revisit and refactor the tests, > possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10304: --- Description: In a different MM2 change, [some concerns|https://github.com/apache/kafka/pull/9029#issuecomment-663094946] on tests were raised. It may be a good time to revisit and refactor the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the broker failure in the middle of integration tests? (3) other validations to check (e.g. topic config sync) was: In a different MM2 change https://github.com/apache/kafka/pull/9029, some concerns on tests were raised. It may be a good time to revisit and refactor the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the broker failure in the middle of integration tests? (3) other validations to check (e.g. topic config sync) > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change, [some > concerns|https://github.com/apache/kafka/pull/9029#issuecomment-663094946] on > tests were raised. It may be a good time to revisit and refactor the tests, > possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10304: --- Comment: was deleted (was: https://github.com/apache/kafka/pull/9224) > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change https://github.com/apache/kafka/pull/9029, some > concerns on tests were raised. It may be a good time to revisit and refactor > the tests, possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192664#comment-17192664 ] Ning Zhang commented on KAFKA-10304: Hi [~mimaison] [~ryannedolan] this is mostly a refactoring pr on the MM2 integration tests. The purpose of doing that: (1) address the concern in the previous PR (https://github.com/apache/kafka/pull/9029), (2) prepare for the future development (e.g. extract common functions). I think the current PR (https://github.com/apache/kafka/pull/9224) is just a starting point, and I am very appreciated for your feedback on what to test additionally and how to get close to the real scenario. > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change https://github.com/apache/kafka/pull/9029, some > concerns on tests were raised. It may be a good time to revisit and refactor > the tests, possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10304: --- Description: In a different MM2 change https://github.com/apache/kafka/pull/9029, some concerns on tests were raised. It may be a good time to revisit and refactor the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the broker failure in the middle of integration tests? (3) other validations to check (e.g. topic config sync) was: due to the quick development of Kafka MM 2, unit and integration tests of MirrorMaker 2 were made just for covering each individual feature and some of them are simply copy-n-paste from the existing tests with small tweaks. It may be a good time to revisit and improve the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the failure in the middle of integration tests? (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset sync..) beyond the mirrored message in integration tests? > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.8.0 > > > In a different MM2 change https://github.com/apache/kafka/pull/9029, some > concerns on tests were raised. It may be a good time to revisit and refactor > the tests, possibly in the following way: > (1) are 100 messages good enough for integration tests? > (2) what about the broker failure in the middle of integration tests? > (3) other validations to check (e.g. topic config sync) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
[ https://issues.apache.org/jira/browse/KAFKA-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10304: --- Component/s: mirrormaker > Revisit and improve the tests of MirrorMaker 2 > -- > > Key: KAFKA-10304 > URL: https://issues.apache.org/jira/browse/KAFKA-10304 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Minor > > due to the quick development of Kafka MM 2, unit and integration tests of > MirrorMaker 2 were made just for covering each individual feature and some of > them are simply copy-n-paste from the existing tests with small tweaks. It > may be a good time to revisit and improve the tests, possibly in the > following way: > (1) are 100 messages good enough for integration tests? > (2) what about the failure in the middle of integration tests? > (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset > sync..) beyond the mirrored message in integration tests? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10339: --- Component/s: mirrormaker > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891 ] Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:13 AM: -- [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} After running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:*cleanup.policy=compact* Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} note that `cleanup.policy=compact` is added after MM2 start was (Author: yangguo1220): [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} After running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:cleanup.policy=compact Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} note that `cleanup.policy=compact` is added after MM2 start > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891 ] Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:13 AM: -- [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} After running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:cleanup.policy=compact Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} note that `cleanup.policy=compact` is added after MM2 start was (Author: yangguo1220): [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} After running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:*cleanup.policy=compact* Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} note that `cleanup.policy=compact` is added after MM2 start > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891 ] Ning Zhang edited comment on KAFKA-10424 at 8/26/20, 3:12 AM: -- [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} After running MM2: {code:java} bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:cleanup.policy=compact Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 {code} note that `cleanup.policy=compact` is added after MM2 start was (Author: yangguo1220): [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: ``` bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 ``` After running MM2: ``` bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:cleanup.policy=compact Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 ``` note that `cleanup.policy=compact` is added after MM2 start > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17184891#comment-17184891 ] Ning Zhang commented on KAFKA-10424: [~grinfeld] I deployed the latest kafka 2.6 and it seems to me this is not a bug: before running MM2: ``` bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs: Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 ``` After running MM2: ``` bash-4.4# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-service-backup:2181 --describe --topic primary.test Topic:primary.test PartitionCount:3ReplicationFactor:3 Configs:cleanup.policy=compact Topic: primary.test Partition: 0Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: primary.test Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: primary.test Partition: 2Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 ``` note that `cleanup.policy=compact` is added after MM2 start > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-10424: -- Assignee: Ning Zhang > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10133: --- Fix Version/s: 2.7.0 > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.7.0 > > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reopened KAFKA-10133: There is no bug in the code, but need some efforts on doc to clarify on where some frequently used configs should be set. > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-10133: -- Assignee: Ning Zhang > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183401#comment-17183401 ] Ning Zhang commented on KAFKA-10133: Great. Then I will probably make a pr to clarify some use cases like this. Thanks > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182977#comment-17182977 ] Ning Zhang commented on KAFKA-10424: Actually, I did a quick validation on real deployment, and the first impression is: "cleanup.policy=compact" is not replicated to the remote topic on the target cluster. So it seems a bug to me and need to look into it > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182976#comment-17182976 ] Ning Zhang commented on KAFKA-10133: [~steveatbat] My experiment seems to show different conclusion. In order to preserve the compression type from upstream producer, in MM2 config file, I override the producer config, such as, {code:java} .producer.compression.type = gzip {code} Then run this command on the target cluster to verify: {code:java} /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /kafka/kafka-logs-kafka2-0/primary.test-0/.log {code} Output: {code:java} Starting offset: 0 offset: 0 position: 0 CreateTime: 1598249096409 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 1 position: 0 CreateTime: 1598249106432 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 2 position: 0 CreateTime: 1598249158144 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] {code} The output shows that the messages replicated to the target cluster are compressed with GZIP. So I am wondering what exact config you use at kafka connect worker level to make compression work > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182698#comment-17182698 ] Ning Zhang commented on KAFKA-10424: Then can you check if other topic prosperities (if any) are copied over from source to target cluster? > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182572#comment-17182572 ] Ning Zhang edited comment on KAFKA-10424 at 8/23/20, 3:28 AM: -- [~grinfeld] "_schemas" topic is a compacted topic. When it is replicated, make sure its name on the target cluster prefix with , e.g. "primary._schemas" and having your schema registry service on target cluster using the correct topic name. MirrorSourceConnector should periodically sync the topic config (including "cleanup.policy") from source to target (https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L122) so the "_schemas" topic on target cluster should be compacted topic. To debug, check if "_schemas" topic on target cluster is *indeed* compacted topic, especially at the moment when you see the above exception. was (Author: yangguo1220): [~grinfeld] "_schemas" topic is a compacted topic. When it is replicated, make sure its name on the target cluster prefix with , e.g. "primary._schemas" MirrorSourceConnector should periodically sync the topic config (including "cleanup.policy") from source to target (https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L122) so the "_schemas" topic on target cluster should be compacted topic. To debug, check if "_schemas" topic on target cluster is *indeed* compacted topic, especially at the moment when you see the above exception. > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182572#comment-17182572 ] Ning Zhang commented on KAFKA-10424: [~grinfeld] "_schemas" topic is a compacted topic. When it is replicated, make sure its name on the target cluster prefix with , e.g. "primary._schemas" MirrorSourceConnector should periodically sync the topic config (including "cleanup.policy") from source to target (https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L122) so the "_schemas" topic on target cluster should be compacted topic. To debug, check if "_schemas" topic on target cluster is *indeed* compacted topic, especially at the moment when you see the above exception. > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181928#comment-17181928 ] Ning Zhang commented on KAFKA-10370: After looking at HDFS Sink connector https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java, it seems that the correct resolution is to add the following method to the SinkTask implementation. @Override public void open(Collection partitions) { loadContextOffsets(); } * loadContextOffsets() is the method to load consumer group offsets from target cluster and put them into context.offsets() Therefore, no change is needed and I am closing this ticket > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >
[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179669#comment-17179669 ] Ning Zhang commented on KAFKA-10370: Hey [~ryannedolan] I updated the PR with the following code in `onPartitionsAssigned`. From my initial testing, it works well with `MirrorSinkTask`. Since the partition assignment is now driven by Consumer (as we use `consumer.subscribe()`), the `offsets` that passed into `context.offsets(offsets)` is all associated with consumer group, rather than letting `MirrorSinkTask` to do the consuming task assignment. Appreciate for your above thoughts and definitely expecting more feedback! https://github.com/apache/kafka/pull/9145/files#diff-9d27e74bcdc892150367aed9a4cf499eR617-R698 > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >
[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174955#comment-17174955 ] Ning Zhang commented on KAFKA-10370: Hi [~rhauch], when you have a chance, I would like to get your initial feedback / advice on this issue and proposed solution. Thanks cc [~ryannedolan] > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in >
[jira] [Issue Comment Deleted] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Comment: was deleted (was: https://github.com/apache/kafka/pull/9145) > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached >
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Reviewer: Randall Hauch > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached > PR is to use *consumer.assign* with
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Fix Version/s: (was: 2.6.0) 2.7.0 > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached > PR is to use
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. As a part of [WorkerSinkTask initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], when the [SinkTask starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], we can supply the specific offsets by +"context.offset(supplied_offsets);+" in start() method, so that when the consumer does the first poll, it should rewind to the specific offsets in rewind() method. However in practice, we saw the following IllegalStateException when running consumer.seek(tp, offsets); {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution (that has been initially verified) proposed in the attached PR is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, to handle the initial position of the consumer, when specific offsets are provided by external through WorkerSinkTaskContext was: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll()
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind the consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. As a part of [WorkerSinkTask initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], when the [SinkTask starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], we can supply the specific offsets by +"context.offset(supplied_offsets);+" in start() method, so that when the consumer does the first poll, it should rewind to the specific offsets in rewind() method. However in practice, we saw the following IllegalStateException when running consumer.seek(tp, offsets); {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind consumer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind customer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind customer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext >
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.6.0 > > > In WorkerSinkTask.java, when we want the consumer to start consuming from > certain offsets, rather than from the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] > is used to carry the offsets from external world (e.g. implementation of > SinkTask). > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, > (2) consumer.seek(tp, offset) to rewind the consumer. > when running (2), we saw the following IllegalStateException: > {code:java} > java.lang.IllegalStateException: No current assignment for partition mytopic-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: ``` java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) ``` As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.6.0 > > > In WorkerSinkTask.java, when we want the consumer to start consuming from > certain offsets, rather than from the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] > is used to carry the offsets from external world (e.g. implementation of > SinkTask). > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, > (2) consumer.seek(tp, offset) to rewind the consumer. > when running (2), we saw the following IllegalStateException: > {code:java} > java.lang.IllegalStateException: No current assignment for partition mytopic-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution is to use *consumer.assign* with *consumer.seek*
[jira] [Created] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
Ning Zhang created KAFKA-10370: -- Summary: WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext Key: KAFKA-10370 URL: https://issues.apache.org/jira/browse/KAFKA-10370 Project: Kafka Issue Type: New Feature Components: KafkaConnect Affects Versions: 2.5.0 Reporter: Ning Zhang Assignee: Ning Zhang Fix For: 2.6.0 In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: ``` java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) ``` As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171670#comment-17171670 ] Ning Zhang commented on KAFKA-10339: it is true that SinkTaskContext.offsets() has been taken covered. I will test this updated idea out in my local and the extra step is to create a "fake" consumer group on target cluster. > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171612#comment-17171612 ] Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:47 PM: - thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets for the consumer in WorkerSinkTask as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? was (Author: yangguo1220): thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171612#comment-17171612 ] Ning Zhang commented on KAFKA-10339: thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171612#comment-17171612 ] Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:44 PM: - thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? was (Author: yangguo1220): thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171249#comment-17171249 ] Ning Zhang commented on KAFKA-10339: Technically, transaction can not be done across clusters, so the above KIP is not valid. Need to look for other alternatives > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170231#comment-17170231 ] Ning Zhang commented on KAFKA-10339: [~ryannedolan] [~mimaison] Here is the KIP I would like to get your thoughts on, thanks so much for any feedback or input https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+MirrorMaker2+Exactly-once+Semantics > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10339: --- Labels: needs-kip (was: ) > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
Ning Zhang created KAFKA-10339: -- Summary: MirrorMaker2 Exactly-once Semantics Key: KAFKA-10339 URL: https://issues.apache.org/jira/browse/KAFKA-10339 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Ning Zhang Assignee: Ning Zhang MirrorMaker2 is currently implemented on Kafka Connect Framework, more specifically the Source Connector / Task, which do not provide exactly-once semantics (EOS) out-of-the-box, as discussed in https://github.com/confluentinc/kafka-connect-jdbc/issues/461, https://github.com/apache/kafka/pull/5553, https://issues.apache.org/jira/browse/KAFKA-6080 and https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
Ning Zhang created KAFKA-10304: -- Summary: Revisit and improve the tests of MirrorMaker 2 Key: KAFKA-10304 URL: https://issues.apache.org/jira/browse/KAFKA-10304 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Ning Zhang Assignee: Ning Zhang due to the quick development of Kafka MM 2, unit and integration tests of MirrorMaker 2 were made just for covering each individual feature and some of them are simply copy-n-paste from the existing tests with small tweaks. It may be a good time to revisit and improve the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the failure in the middle of integration tests? (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset sync..) beyond the mirrored message in integration tests? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163802#comment-17163802 ] Ning Zhang commented on KAFKA-1: Hi Chris, the purpose of this ticket is very interesting. I wonder what is the priority in the overall Kafka Connect backlog, or how is the progress so far (needs-KIP)? Thanks > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144388#comment-17144388 ] Ning Zhang commented on KAFKA-9076: --- [~rhauch] It is true that this jira is not a blocking factor to 2.6.0 release and thank you for checking it. Given the PR has been proposed for over 6 months, some users are testing it in different use cases, I would hope the committers ([~mimaison]) and other reviewers may take an other review on https://github.com/apache/kafka/pull/7577 and see if we could iterate faster, so that it can formally be part of Kafka in the 2.7.0 release. > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.7.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > [https://github.com/apache/kafka/pull/7577] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9219) NullPointerException when polling metrics from Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-9219: - Assignee: Ning Zhang > NullPointerException when polling metrics from Kafka Connect > > > Key: KAFKA-9219 > URL: https://issues.apache.org/jira/browse/KAFKA-9219 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Mickael Maison >Assignee: Ning Zhang >Priority: Major > Fix For: 2.4.0, 2.5.0 > > > The following stack trace appears: > > {code:java} > [2019-11-05 23:56:57,909] WARN Error getting JMX attribute 'assigned-tasks' > (org.apache.kafka.common.metrics.JmxReporter:202) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.distributed.WorkerCoordinator$WorkerCoordinatorMetrics$2.measure(WorkerCoordinator.java:316) > at > org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:66) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:190) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:200) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > at > javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1449) > at > javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) > at > javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) > at > javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) > at > javax.management.remote.rmi.RMIConnectionImpl.getAttributes(RMIConnectionImpl.java:675) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) > at sun.rmi.transport.Transport$1.run(Transport.java:200) > at sun.rmi.transport.Transport$1.run(Transport.java:197) > at java.security.AccessController.doPrivileged(Native Method) > at sun.rmi.transport.Transport.serviceCall(Transport.java:196) > at > sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:835) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) > at java.security.AccessController.doPrivileged(Native Method) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-1, > groupId=backup-mm2] Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopping > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:609) > [2019-11-05 23:57:07,822] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:07,822] INFO Kafka MirrorMaker stopped. > (org.apache.kafka.connect.mirror.MirrorMaker:191) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-9076: - Assignee: Ning Zhang > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.5.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > [https://github.com/apache/kafka/pull/7577] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
[ https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-9360: - Assignee: Ning Zhang > emitting checkpoint and heartbeat set to false will not disable the activity > in their SourceTask > > > Key: KAFKA-9360 > URL: https://issues.apache.org/jira/browse/KAFKA-9360 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot > 2020-01-02 at 3.18.23 PM.png > > > `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be > the knobs to control if the heartbeat message or checkpoint message will be > sent or not to the topics respectively. In our experiments, setting them to > false will not suspend the activity in their SourceTasks, e.g. > MirrorHeartbeatTask, MirrorCheckpointTask. > The observations are, when setting those knobs to false, huge volume of > `SourceRecord` are being sent without interval, causing significantly high > CPU usage of MirrorMaker 2 instance, GC time and congesting the single > partition of the heartbeat topic and checkpoint topic. > The proposed fix in the following PR is to (1) explicitly check if `interval` > is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or > `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to > negative, put the thread in sleep mode for a while (e.g. 5 seconds) and > return null, in orderto prevent it from (1) hogging the cpu, (2) sending > heartbeat or checkpoint messages to Kafka topics. > PR link: https://github.com/apache/kafka/pull/7887 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9352) unbalanced assignment of topic-partition to tasks
[ https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9352: -- Issue Type: New Feature (was: Improvement) > unbalanced assignment of topic-partition to tasks > - > > Key: KAFKA-9352 > URL: https://issues.apache.org/jira/browse/KAFKA-9352 > Project: Kafka > Issue Type: New Feature > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot > 2019-12-19 at 8.22.17 AM.png > > > originally, when mirrormaker replicates a group of topics, the assignment > between topic-partition and tasks are pretty static. E.g. partitions from the > same topic tend to be grouped together as much as possible on the same task. > For example, 3 tasks to mirror 3 topics with 8, 2 and 2 > partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, > partition 5' > The original assignment will look like: > t1 -> [t0p0, t0p1, t0p2, t0p3] > t2 -> [t0p4, t0p5, t0p6, t0p7] > t3 -> [t1p0, t1p2, t2p0, t2p1] > The potential issue of above assignment is: if topic 0 has more traffic than > other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than > t3. When the tasks are mapped to the mirrormaker instances (workers) and > launched, it will create unbalanced load on the workers. Please see the > picture below as an unbalanced example of 2 mirrormaker instances: > !Screen Shot 2019-12-19 at 12.16.02 PM.png! > Given each mirrored topic has different traffic and number of partitions, to > balance the load > across all mirrormaker instances (workers), 'roundrobin' helps to evenly > assign all > topic-partition to the tasks, then the tasks are further distributed to > workers by calling > 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics > with 8, 2 and 2 > partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, > partition 5' > t1 -> [t0p0, t0p3, t0p6, t1p1] > t2 -> [t0p1, t0p4, t0p7, t2p0] > t3 -> [t0p2, t0p5, t1p0, t2p1] > The improvement of this new above assignment over the original assignment is: > the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, > which creates a relatively even load on all workers, after the tasks are > mapped to the workers and launched. > Please see the picture below as a balanced example of 4 mirrormaker instances: > !Screen Shot 2019-12-19 at 8.22.17 AM.png! > PR link is: https://github.com/apache/kafka/pull/7880 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
[ https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007130#comment-17007130 ] Ning Zhang commented on KAFKA-9360: --- pr link: https://github.com/apache/kafka/pull/7887 > emitting checkpoint and heartbeat set to false will not disable the activity > in their SourceTask > > > Key: KAFKA-9360 > URL: https://issues.apache.org/jira/browse/KAFKA-9360 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot > 2020-01-02 at 3.18.23 PM.png > > > `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be > the knobs to control if the heartbeat message or checkpoint message will be > sent or not to the topics respectively. In our experiments, setting them to > false will not suspend the activity in their SourceTasks, e.g. > MirrorHeartbeatTask, MirrorCheckpointTask. > The observations are, when setting those knobs to false, huge volume of > `SourceRecord` are being sent without interval, causing significantly high > CPU usage of MirrorMaker 2 instance, GC time and congesting the single > partition of the heartbeat topic and checkpoint topic. > The proposed fix in the following PR is to (1) explicitly check if `interval` > is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or > `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to > negative, put the thread in sleep mode for a while (e.g. 5 seconds) and > return null, in orderto prevent it from (1) hogging the cpu, (2) sending > heartbeat or checkpoint messages to Kafka topics. > PR link: https://github.com/apache/kafka/pull/7887 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
[ https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9360: -- Description: `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance, GC time and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or checkpoint messages to Kafka topics. PR link: https://github.com/apache/kafka/pull/7887 was: `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance, GC time and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or checkpoint messages to Kafka topics > emitting checkpoint and heartbeat set to false will not disable the activity > in their SourceTask > > > Key: KAFKA-9360 > URL: https://issues.apache.org/jira/browse/KAFKA-9360 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot > 2020-01-02 at 3.18.23 PM.png > > > `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be > the knobs to control if the heartbeat message or checkpoint message will be > sent or not to the topics respectively. In our experiments, setting them to > false will not suspend the activity in their SourceTasks, e.g. > MirrorHeartbeatTask, MirrorCheckpointTask. > The observations are, when setting those knobs to false, huge volume of > `SourceRecord` are being sent without interval, causing significantly high > CPU usage of MirrorMaker 2 instance, GC time and congesting the single > partition of the heartbeat topic and checkpoint topic. > The proposed fix in the following PR is to (1) explicitly check if `interval` > is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or > `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to > negative, put the thread in sleep mode for a while (e.g. 5 seconds) and > return null, in orderto prevent it from (1) hogging the cpu, (2) sending > heartbeat or checkpoint messages to Kafka topics. > PR link: https://github.com/apache/kafka/pull/7887 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
[ https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9360: -- Description: `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance, GC time and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or checkpoint messages to Kafka topics was: `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or checkpoint messages to Kafka topics > emitting checkpoint and heartbeat set to false will not disable the activity > in their SourceTask > > > Key: KAFKA-9360 > URL: https://issues.apache.org/jira/browse/KAFKA-9360 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot > 2020-01-02 at 3.18.23 PM.png > > > `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be > the knobs to control if the heartbeat message or checkpoint message will be > sent or not to the topics respectively. In our experiments, setting them to > false will not suspend the activity in their SourceTasks, e.g. > MirrorHeartbeatTask, MirrorCheckpointTask. > The observations are, when setting those knobs to false, huge volume of > `SourceRecord` are being sent without interval, causing significantly high > CPU usage of MirrorMaker 2 instance, GC time and congesting the single > partition of the heartbeat topic and checkpoint topic. > The proposed fix in the following PR is to (1) explicitly check if `interval` > is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or > `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to > negative, put the thread in sleep mode for a while (e.g. 5 seconds) and > return null, in orderto prevent it from (1) hogging the cpu, (2) sending > heartbeat or checkpoint messages to Kafka topics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
[ https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9360: -- Attachment: Screen Shot 2020-01-02 at 3.18.23 PM.png > emitting checkpoint and heartbeat set to false will not disable the activity > in their SourceTask > > > Key: KAFKA-9360 > URL: https://issues.apache.org/jira/browse/KAFKA-9360 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png, Screen Shot > 2020-01-02 at 3.18.23 PM.png > > > `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be > the knobs to control if the heartbeat message or checkpoint message will be > sent or not to the topics respectively. In our experiments, setting them to > false will not suspend the activity in their SourceTasks, e.g. > MirrorHeartbeatTask, MirrorCheckpointTask. > The observations are, when setting those knobs to false, huge volume of > `SourceRecord` are being sent without interval, causing significantly high > CPU usage of MirrorMaker 2 instance, GC time and congesting the single > partition of the heartbeat topic and checkpoint topic. > The proposed fix in the following PR is to (1) explicitly check if `interval` > is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or > `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to > negative, put the thread in sleep mode for a while (e.g. 5 seconds) and > return null, in orderto prevent it from (1) hogging the cpu, (2) sending > heartbeat or checkpoint messages to Kafka topics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
[ https://issues.apache.org/jira/browse/KAFKA-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9360: -- Description: `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode for a while (e.g. 5 seconds) and return null, in orderto prevent it from (1) hogging the cpu, (2) sending heartbeat or checkpoint messages to Kafka topics was: `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode (e.g. 5 seconds) and return null, to prevent it from running the remaining logic. > emitting checkpoint and heartbeat set to false will not disable the activity > in their SourceTask > > > Key: KAFKA-9360 > URL: https://issues.apache.org/jira/browse/KAFKA-9360 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png > > > `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be > the knobs to control if the heartbeat message or checkpoint message will be > sent or not to the topics respectively. In our experiments, setting them to > false will not suspend the activity in their SourceTasks, e.g. > MirrorHeartbeatTask, MirrorCheckpointTask. > The observations are, when setting those knobs to false, huge volume of > `SourceRecord` are being sent without interval, causing significantly high > CPU usage of MirrorMaker 2 instance and congesting the single partition of > the heartbeat topic and checkpoint topic. > The proposed fix in the following PR is to (1) explicitly check if `interval` > is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or > `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to > negative, put the thread in sleep mode for a while (e.g. 5 seconds) and > return null, in orderto prevent it from (1) hogging the cpu, (2) sending > heartbeat or checkpoint messages to Kafka topics -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9360) emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask
Ning Zhang created KAFKA-9360: - Summary: emitting checkpoint and heartbeat set to false will not disable the activity in their SourceTask Key: KAFKA-9360 URL: https://issues.apache.org/jira/browse/KAFKA-9360 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.4.0 Reporter: Ning Zhang Fix For: 2.5.0 Attachments: Screen Shot 2020-01-02 at 2.55.38 PM.png `emit.heartbeats.enabled` and `emit.checkpoints.enabled` are supposed to be the knobs to control if the heartbeat message or checkpoint message will be sent or not to the topics respectively. In our experiments, setting them to false will not suspend the activity in their SourceTasks, e.g. MirrorHeartbeatTask, MirrorCheckpointTask. The observations are, when setting those knobs to false, huge volume of `SourceRecord` are being sent without interval, causing significantly high CPU usage of MirrorMaker 2 instance and congesting the single partition of the heartbeat topic and checkpoint topic. The proposed fix in the following PR is to (1) explicitly check if `interval` is set to negative (e.g. -1), when the `emit.heartbeats.enabled` or `emit.checkpoints.enabled` is off. (2) if `interval` is indeed set to negative, put the thread in sleep mode (e.g. 5 seconds) and return null, to prevent it from running the remaining logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9352) unbalanced assignment of topic-partition to tasks
[ https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006244#comment-17006244 ] Ning Zhang commented on KAFKA-9352: --- pr: https://github.com/apache/kafka/pull/7880 > unbalanced assignment of topic-partition to tasks > - > > Key: KAFKA-9352 > URL: https://issues.apache.org/jira/browse/KAFKA-9352 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot > 2019-12-19 at 8.22.17 AM.png > > > originally, when mirrormaker replicates a group of topics, the assignment > between topic-partition and tasks are pretty static. E.g. partitions from the > same topic tend to be grouped together as much as possible on the same task. > For example, 3 tasks to mirror 3 topics with 8, 2 and 2 > partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, > partition 5' > The original assignment will look like: > t1 -> [t0p0, t0p1, t0p2, t0p3] > t2 -> [t0p4, t0p5, t0p6, t0p7] > t3 -> [t1p0, t1p2, t2p0, t2p1] > The potential issue of above assignment is: if topic 0 has more traffic than > other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than > t3. When the tasks are mapped to the mirrormaker instances (workers) and > launched, it will create unbalanced load on the workers. Please see the > picture below as an unbalanced example of 2 mirrormaker instances: > !Screen Shot 2019-12-19 at 12.16.02 PM.png! > Given each mirrored topic has different traffic and number of partitions, to > balance the load > across all mirrormaker instances (workers), 'roundrobin' helps to evenly > assign all > topic-partition to the tasks, then the tasks are further distributed to > workers by calling > 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics > with 8, 2 and 2 > partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, > partition 5' > t1 -> [t0p0, t0p3, t0p6, t1p1] > t2 -> [t0p1, t0p4, t0p7, t2p0] > t3 -> [t0p2, t0p5, t1p0, t2p1] > The improvement of this new above assignment over the original assignment is: > the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, > which creates a relatively even load on all workers, after the tasks are > mapped to the workers and launched. > Please see the picture below as a balanced example of 4 mirrormaker instances: > !Screen Shot 2019-12-19 at 8.22.17 AM.png! > PR link is: https://github.com/apache/kafka/pull/7880 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9352) unbalanced assignment of topic-partition to tasks
[ https://issues.apache.org/jira/browse/KAFKA-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9352: -- Description: originally, when mirrormaker replicates a group of topics, the assignment between topic-partition and tasks are pretty static. E.g. partitions from the same topic tend to be grouped together as much as possible on the same task. For example, 3 tasks to mirror 3 topics with 8, 2 and 2 partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, partition 5' The original assignment will look like: t1 -> [t0p0, t0p1, t0p2, t0p3] t2 -> [t0p4, t0p5, t0p6, t0p7] t3 -> [t1p0, t1p2, t2p0, t2p1] The potential issue of above assignment is: if topic 0 has more traffic than other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than t3. When the tasks are mapped to the mirrormaker instances (workers) and launched, it will create unbalanced load on the workers. Please see the picture below as an unbalanced example of 2 mirrormaker instances: !Screen Shot 2019-12-19 at 12.16.02 PM.png! Given each mirrored topic has different traffic and number of partitions, to balance the load across all mirrormaker instances (workers), 'roundrobin' helps to evenly assign all topic-partition to the tasks, then the tasks are further distributed to workers by calling 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics with 8, 2 and 2 partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, partition 5' t1 -> [t0p0, t0p3, t0p6, t1p1] t2 -> [t0p1, t0p4, t0p7, t2p0] t3 -> [t0p2, t0p5, t1p0, t2p1] The improvement of this new above assignment over the original assignment is: the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, which creates a relatively even load on all workers, after the tasks are mapped to the workers and launched. Please see the picture below as a balanced example of 4 mirrormaker instances: !Screen Shot 2019-12-19 at 8.22.17 AM.png! PR link is: https://github.com/apache/kafka/pull/7880 was: originally, when mirrormaker replicates a group of topics, the assignment between topic-partition and tasks are pretty static. E.g. partitions from the same topic tend to be grouped together as much as possible on the same task. For example, 3 tasks to mirror 3 topics with 8, 2 and 2 partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, partition 5' The original assignment will look like: t1 -> [t0p0, t0p1, t0p2, t0p3] t2 -> [t0p4, t0p5, t0p6, t0p7] t3 -> [t1p0, t1p2, t2p0, t2p1] The potential issue of above assignment is: if topic 0 has more traffic than other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than t3. When the tasks are mapped to the mirrormaker instances (workers) and launched, it will create unbalanced load on the workers. Please see the picture below as an unbalanced example of 2 mirrormaker instances: !Screen Shot 2019-12-19 at 12.16.02 PM.png! Given each mirrored topic has different traffic and number of partitions, to balance the load across all mirrormaker instances (workers), 'roundrobin' helps to evenly assign all topic-partition to the tasks, then the tasks are further distributed to workers by calling 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics with 8, 2 and 2 partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, partition 5' t1 -> [t0p0, t0p3, t0p6, t1p1] t2 -> [t0p1, t0p4, t0p7, t2p0] t3 -> [t0p2, t0p5, t1p0, t2p1] The improvement of this new above assignment over the original assignment is: the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, which creates a relatively even load on all workers, after the tasks are mapped to the workers and launched. Please see the picture below as a balanced example of 4 mirrormaker instances: !Screen Shot 2019-12-19 at 8.22.17 AM.png! > unbalanced assignment of topic-partition to tasks > - > > Key: KAFKA-9352 > URL: https://issues.apache.org/jira/browse/KAFKA-9352 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Fix For: 2.5.0 > > Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot > 2019-12-19 at 8.22.17 AM.png > > > originally, when mirrormaker replicates a group of topics, the assignment > between topic-partition and tasks are pretty static. E.g. partitions from the > same topic tend to be grouped together as much as possible on the same task. > For example, 3 tasks to mirror 3 topics with 8, 2 and 2 > partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, > partition 5' > The original assignment will look like: > t1 -> [t0p0,
[jira] [Created] (KAFKA-9352) unbalanced assignment of topic-partition to tasks
Ning Zhang created KAFKA-9352: - Summary: unbalanced assignment of topic-partition to tasks Key: KAFKA-9352 URL: https://issues.apache.org/jira/browse/KAFKA-9352 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.4.0 Reporter: Ning Zhang Fix For: 2.5.0 Attachments: Screen Shot 2019-12-19 at 12.16.02 PM.png, Screen Shot 2019-12-19 at 8.22.17 AM.png originally, when mirrormaker replicates a group of topics, the assignment between topic-partition and tasks are pretty static. E.g. partitions from the same topic tend to be grouped together as much as possible on the same task. For example, 3 tasks to mirror 3 topics with 8, 2 and 2 partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, partition 5' The original assignment will look like: t1 -> [t0p0, t0p1, t0p2, t0p3] t2 -> [t0p4, t0p5, t0p6, t0p7] t3 -> [t1p0, t1p2, t2p0, t2p1] The potential issue of above assignment is: if topic 0 has more traffic than other topics (topic 1, topic 2), t1 and t2 will be loaded more traffic than t3. When the tasks are mapped to the mirrormaker instances (workers) and launched, it will create unbalanced load on the workers. Please see the picture below as an unbalanced example of 2 mirrormaker instances: !Screen Shot 2019-12-19 at 12.16.02 PM.png! Given each mirrored topic has different traffic and number of partitions, to balance the load across all mirrormaker instances (workers), 'roundrobin' helps to evenly assign all topic-partition to the tasks, then the tasks are further distributed to workers by calling 'ConnectorUtils.groupPartitions()'. For example, 3 tasks to mirror 3 topics with 8, 2 and 2 partitions respectively. 't1' denotes 'task 1', 't0p5' denotes 'topic 0, partition 5' t1 -> [t0p0, t0p3, t0p6, t1p1] t2 -> [t0p1, t0p4, t0p7, t2p0] t3 -> [t0p2, t0p5, t1p0, t2p1] The improvement of this new above assignment over the original assignment is: the partitions of topic 0, topic 1 and topic 2 are all spread over all tasks, which creates a relatively even load on all workers, after the tasks are mapped to the workers and launched. Please see the picture below as a balanced example of 4 mirrormaker instances: !Screen Shot 2019-12-19 at 8.22.17 AM.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16958430#comment-16958430 ] Ning Zhang commented on KAFKA-9076: --- [~ryannedolan] KIP is added: https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.5.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. > > https://github.com/apache/kafka/pull/7577 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9076: -- Description: To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer and stream applications migrate from source to target cluster transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switch to the target cluster, it will resume to consume from where it left off at source cluster. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 [https://github.com/apache/kafka/pull/7577] was: To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer and stream applications migrate from source to target cluster transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switch to the target cluster, it will resume to consume from where it left off at source cluster. https://github.com/apache/kafka/pull/7577 > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.5.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > [https://github.com/apache/kafka/pull/7577] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9076: -- Description: To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer and stream applications migrate from source to target cluster transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switch to the target cluster, it will resume to consume from where it left off at source cluster. https://github.com/apache/kafka/pull/7577 was: To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer and stream applications migrate from source to target cluster transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switch to the target cluster, it will resume to consume from where it left off at source cluster. > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.5.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. > > https://github.com/apache/kafka/pull/7577 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-9076: -- Description: To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer and stream applications migrate from source to target cluster transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switch to the target cluster, it will resume to consume from where it left off at source cluster. was: To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer migration from source to target cluster transparent and convenient, e.g. in event of source cluster failure, it is better to have a background job to continuously and periodically sync the consumer offsets from the source to target cluster, so that when the consumer switches to the target cluster, it will resume to consume from where it left off at source cluster. > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.5.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer and stream applications migrate from source to > target cluster transparently and conveniently, e.g. in event of source > cluster failure, a background job is proposed to periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > and stream applications switch to the target cluster, it will resume to > consume from where it left off at source cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16957169#comment-16957169 ] Ning Zhang edited comment on KAFKA-9076 at 10/22/19 3:28 PM: - [~ryannedolan] Thanks for your comments. Here is my response: {quote} in order to write offsets for a consumer group, we need to know that the group is not already running on the target cluster. Otherwise we'd be stepping on that group's current offsets. The group coordinator won't allow this afaik. {quote} my PR will could check if the group is active or not in the target cluster. If active, no need to sync the offset {quote} we could kick out a target group and force it to seek to the new offsets by revoking group membership and forcing a rebalance etc. But we wouldn't want to do this periodically. {quote} For Kafka Stream, I am not sure if there is a way to force the stream application to seek from a particular offset, like `consumer.seek()`. {quote} we could write offsets to a new group ID, eg. us-west group1, just like we do with topics, s.t. we avoid the above issues. Then migrating groups would involve changing the group ID. That works fine, but consumers would need a way to determine which group ID to use. Translating group ID like that is more cumbersome than translating offsets, since offsets can be altered using existing tools, but there is no way to tell a consumer to change its group ID. {quote} When failover or migrate from one to another cluster, especially doing it manually, if this requires to change the broker URL of consumer / stream application to point to the backup cluster, it may not be cumbersome to change the consumer group ID as well. {quote} I think there are scenarios where automatically writing offsets as you propose might make sense, e.g. in an active/standby scenario where consumers only connect to one cluster at a time. But if you are automating that behavior, you might as well automate the offset translation via RemoteClusterUtils, IMO. {quote} offset translation is the first step and can be certainly done by RemoteClusterUtils. The second step is to write the offsets to the target cluster. IMO, RemoteClusterUtils may not do the second step. {quote} My team has built external tooling using RemoteClusterUtils that works with existing consumers. It's possible to fully automate failover and failback this way. I'm skeptical that automatically writing offsets as you propose would make this process simpler. {quote} I have a PR for review and use it internally in our system, which works well for active/standup and migrating stream applications from one cluster to another cluster transparently. Definitely more considerations on this approach could be needed in the future. The PR is small and by default I propose to disable this auto sync feature for now. Do we want to see the PR and exchange more thoughts? Thanks was (Author: yangguo1220): [~ryannedolan] Thanks for your comments. Here is my response: {{{quote}}} in order to write offsets for a consumer group, we need to know that the group is not already running on the target cluster. Otherwise we'd be stepping on that group's current offsets. The group coordinator won't allow this afaik. {{{quote}}} my PR will could check if the group is active or not in the target cluster. If active, no need to sync the offset > we could kick out a target group and force it to seek to the new offsets by > revoking group membership and forcing a rebalance etc. But we wouldn't want > > to do this periodically. For Kafka Stream, I am not sure if there is a way to force the stream application to seek from a particular offset, like `consumer.seek()`. > we could write offsets to a new group ID, eg. us-west group1, just like we do > with topics, s.t. we avoid the above issues. Then migrating groups would > > involve changing the group ID. That works fine, but consumers would need a > way to determine which group ID to use. Translating group ID like that is > more > cumbersome than translating offsets, since offsets can be altered > using existing tools, but there is no way to tell a consumer to change its > group ID. When failover or migrate from one to another cluster, especially doing it manually, if this requires to change the broker URL of consumer / stream application to point to the backup cluster, it may not be cumbersome to change the consumer group ID as well. >I think there are scenarios where automatically writing offsets as you propose >might make sense, e.g. in an active/standby scenario where consumers only > >connect to one cluster at a time. But if you are automating that behavior, you >might as well automate the offset translation via RemoteClusterUtils, IMO. offset translation is the first step and can be certainly done by RemoteClusterUtils. The second step
[jira] [Comment Edited] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16957169#comment-16957169 ] Ning Zhang edited comment on KAFKA-9076 at 10/22/19 3:27 PM: - [~ryannedolan] Thanks for your comments. Here is my response: {{{quote}}} in order to write offsets for a consumer group, we need to know that the group is not already running on the target cluster. Otherwise we'd be stepping on that group's current offsets. The group coordinator won't allow this afaik. {{{quote}}} my PR will could check if the group is active or not in the target cluster. If active, no need to sync the offset > we could kick out a target group and force it to seek to the new offsets by > revoking group membership and forcing a rebalance etc. But we wouldn't want > > to do this periodically. For Kafka Stream, I am not sure if there is a way to force the stream application to seek from a particular offset, like `consumer.seek()`. > we could write offsets to a new group ID, eg. us-west group1, just like we do > with topics, s.t. we avoid the above issues. Then migrating groups would > > involve changing the group ID. That works fine, but consumers would need a > way to determine which group ID to use. Translating group ID like that is > more > cumbersome than translating offsets, since offsets can be altered > using existing tools, but there is no way to tell a consumer to change its > group ID. When failover or migrate from one to another cluster, especially doing it manually, if this requires to change the broker URL of consumer / stream application to point to the backup cluster, it may not be cumbersome to change the consumer group ID as well. >I think there are scenarios where automatically writing offsets as you propose >might make sense, e.g. in an active/standby scenario where consumers only > >connect to one cluster at a time. But if you are automating that behavior, you >might as well automate the offset translation via RemoteClusterUtils, IMO. offset translation is the first step and can be certainly done by RemoteClusterUtils. The second step is to write the offsets to the target cluster. IMO, RemoteClusterUtils may not do the second step. >My team has built external tooling using RemoteClusterUtils that works with >existing consumers. It's possible to fully automate failover and failback this >> way. I'm skeptical that automatically writing offsets as you propose would >make this process simpler. I have a PR for review and use it internally in our system, which works well for active/standup and migrating stream applications from one cluster to another cluster transparently. Definitely more considerations on this approach could be needed in the future. The PR is small and by default I propose to disable this auto sync feature for now. Do we want to see the PR and exchange more thoughts? Thanks was (Author: yangguo1220): [~ryannedolan] Thanks for your comments. Here is my response: > in order to write offsets for a consumer group, we need to know that the > group is not already running on the target cluster. Otherwise we'd be > stepping on > that group's current offsets. The group coordinator won't allow > this afaik. my PR will could check if the group is active or not in the target cluster. If active, no need to sync the offset > we could kick out a target group and force it to seek to the new offsets by > revoking group membership and forcing a rebalance etc. But we wouldn't want > > to do this periodically. For Kafka Stream, I am not sure if there is a way to force the stream application to seek from a particular offset, like `consumer.seek()`. > we could write offsets to a new group ID, eg. us-west group1, just like we do > with topics, s.t. we avoid the above issues. Then migrating groups would > > involve changing the group ID. That works fine, but consumers would need a > way to determine which group ID to use. Translating group ID like that is > more > cumbersome than translating offsets, since offsets can be altered > using existing tools, but there is no way to tell a consumer to change its > group ID. When failover or migrate from one to another cluster, especially doing it manually, if this requires to change the broker URL of consumer / stream application to point to the backup cluster, it may not be cumbersome to change the consumer group ID as well. >I think there are scenarios where automatically writing offsets as you propose >might make sense, e.g. in an active/standby scenario where consumers only > >connect to one cluster at a time. But if you are automating that behavior, you >might as well automate the offset translation via RemoteClusterUtils, IMO. offset translation is the first step and can be certainly done by RemoteClusterUtils. The second step is to write the offsets to the target cluster.
[jira] [Commented] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
[ https://issues.apache.org/jira/browse/KAFKA-9076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16957169#comment-16957169 ] Ning Zhang commented on KAFKA-9076: --- [~ryannedolan] Thanks for your comments. Here is my response: > in order to write offsets for a consumer group, we need to know that the > group is not already running on the target cluster. Otherwise we'd be > stepping on > that group's current offsets. The group coordinator won't allow > this afaik. my PR will could check if the group is active or not in the target cluster. If active, no need to sync the offset > we could kick out a target group and force it to seek to the new offsets by > revoking group membership and forcing a rebalance etc. But we wouldn't want > > to do this periodically. For Kafka Stream, I am not sure if there is a way to force the stream application to seek from a particular offset, like `consumer.seek()`. > we could write offsets to a new group ID, eg. us-west group1, just like we do > with topics, s.t. we avoid the above issues. Then migrating groups would > > involve changing the group ID. That works fine, but consumers would need a > way to determine which group ID to use. Translating group ID like that is > more > cumbersome than translating offsets, since offsets can be altered > using existing tools, but there is no way to tell a consumer to change its > group ID. When failover or migrate from one to another cluster, especially doing it manually, if this requires to change the broker URL of consumer / stream application to point to the backup cluster, it may not be cumbersome to change the consumer group ID as well. >I think there are scenarios where automatically writing offsets as you propose >might make sense, e.g. in an active/standby scenario where consumers only > >connect to one cluster at a time. But if you are automating that behavior, you >might as well automate the offset translation via RemoteClusterUtils, IMO. offset translation is the first step and can be certainly done by RemoteClusterUtils. The second step is to write the offsets to the target cluster. IMO, RemoteClusterUtils may not do the second step. >My team has built external tooling using RemoteClusterUtils that works with >existing consumers. It's possible to fully automate failover and failback this >> way. I'm skeptical that automatically writing offsets as you propose would >make this process simpler. I have a PR for review and use it internally in our system, which works well for active/standup and migrating stream applications from one cluster to another cluster transparently. Definitely more considerations on this approach could be needed in the future. The PR is small and by default I propose to disable this auto sync feature for now. Do we want to see the PR and exchange more thoughts? Thanks > MirrorMaker 2.0 automated consumer offset sync > -- > > Key: KAFKA-9076 > URL: https://issues.apache.org/jira/browse/KAFKA-9076 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Ning Zhang >Priority: Major > Labels: mirrormaker, pull-request-available > Fix For: 2.5.0 > > > To calculate the translated consumer offset in the target cluster, currently > `Mirror-client` provides a function called "remoteConsumerOffsets()" that is > used by "RemoteClusterUtils" for one-time purpose. > In order to make the consumer migration from source to target cluster > transparent and convenient, e.g. in event of source cluster failure, it is > better to have a background job to continuously and periodically sync the > consumer offsets from the source to target cluster, so that when the consumer > switches to the target cluster, it will resume to consume from where it left > off at source cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9076) MirrorMaker 2.0 automated consumer offset sync
Ning Zhang created KAFKA-9076: - Summary: MirrorMaker 2.0 automated consumer offset sync Key: KAFKA-9076 URL: https://issues.apache.org/jira/browse/KAFKA-9076 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.4.0 Reporter: Ning Zhang Fix For: 2.5.0 To calculate the translated consumer offset in the target cluster, currently `Mirror-client` provides a function called "remoteConsumerOffsets()" that is used by "RemoteClusterUtils" for one-time purpose. In order to make the consumer migration from source to target cluster transparent and convenient, e.g. in event of source cluster failure, it is better to have a background job to continuously and periodically sync the consumer offsets from the source to target cluster, so that when the consumer switches to the target cluster, it will resume to consume from where it left off at source cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)