Question about MM2's source-cluster.checkpoints.internal topic
Hi, I am using MM2 version 3.1.0 for a uni-directional mirroring of source-cluster -> target-cluster. I am confused with the values found in the source-cluster.checkpoints.internal topic at the target cluster when the source's earliest offset is not 0. In my test setup, I prepared a topic samples.greetings with 10 records at the source cluster and using consumer-group greeting10 to consume those 10 records at the source cluster first. After that, I deleted 5 records from this topic to move the earliest offset to 5 and subsequently produced 3 additional records to this topic. Next, I verified consumer group greetings's state at the source cluster using kafka-consumer-group.sh, which showed the offset 10 and the lag 3, as expected. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID greetings samples.greetings 0 10 13 3 - - - Next, I started the MM2 to mirror this topic to the target cluster. After the mirroring is complete, I used RemoteClusterUtils.translateOffsets to get the translated offset of this consumer group. This returned 10 instead of 5. Then, I also looked into topic source-cluster.checkpoints.internal at the target cluster. I was expecting to find records with {upstreamOffset:10, downstreamOffset:5}, but I found the records with {upstreamOffset:10, downstreamOffset:10}. And consumer group greetings's state at the target cluster was shown by kafka-consumer-group.sh as GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID greetings samples.greetings 0 10 8 -2- - - I thought the checkpoint topic would have the translated downstreamOffset of 5 and the consumer offset at the target cluster was translated to 5. This would have allowed the consumer to simply switch to the target cluster. Therefore, I am wondering if this is the expected behavior or something went wrong. To be able to simply switch the consumer to the target cluster, I could reset the current offset at the target cluster to the correct offset by comparing the LAG values at the source and target clusters and setting the current offset by subtracting the difference (assuming the mirroring is stopped so that the updated offset won't be overwritten) from the current offset value. But this is a bit of pain. Thank you for any help. Regards, aki
Re: When using MM2, how should the consumer be switched from source to target?
Hi Ning, I mean by "when there are some messages left in the original cluster", when the consumer switches from the original cluster to the new cluster before entirely consuming the messages at the original cluster. This commit illustrates this situation. https://github.com/elakito/kafka/commit/9063db0f8c4a53f5d9764612af898981d499a7b7 regards, aki El mié, 13 ene 2021 a las 3:17, Ning Zhang () escribió: > > Hello Aki, > > Can you elaborate on "when there are some messages left in the original > cluster" ? > > On 2021/01/12 13:01:40, Aki Yoshida wrote: > > Hi Ryanne, > > Thanks for the information regarding the manual translation approach. > > But for 2.7.0, I have a question. I thought this translation would > > happen implicitly. > > I saw test testOneWayReplicationWithAutoOffsetSync of > > MirrorConnectorsIntegrationTest that is supposed to test this > > behavior. But I have an impression that the consumers can't be > > migrated when there are some messages left in the original cluster. I > > posted this question to users@kafka a while ago but I didn't receive > > any responses. > > https://lists.apache.org/thread.html/r0728e9ea89b2713865a33396f108027ac3f0949c2496583532ee963c%40%3Cusers.kafka.apache.org%3E > > Could you or someone comment on this? > > Thanks. > > Regards, aki > > > > El sáb, 2 ene 2021 a las 20:31, Ryanne Dolan () > > escribió: > > > > > > Aki, that's right. Prior to 2.7, you can use the translateOffsets method > > > from within your client code, or you can write a little command-line tool > > > to do it. I've done the latter for Cloudera's srm-control tool, as > > > documented here: > > > https://docs.cloudera.com/csp/2.0.1/srm-using/topics/srm-migrating-consumer-groups.html > > > > > > Ryanne > > > > > > On Thu, Dec 17, 2020, 5:23 PM Aki Yoshida wrote: > > > > > > > I'm answering to my own mail and would like to hear if this assumption > > > > is correct. > > > > it looks like I need 2.7.0 to have the automatic translation. > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > > > > for earlier versions, there is no way to use the console client but > > > > use java API RemoteClusterUtils.translateOffsets() to get the offset > > > > and re-consuming from the new topic. > > > > Is this correct? > > > > > > > > El jue, 17 dic 2020 a las 21:18, Aki Yoshida () > > > > escribió: > > > > > > > > > > Hi, > > > > > I have a question regarding how to migrate a consumer when the > > > > > subscribing topic has been migrated to the target Kafka broker. > > > > > Suppose a consumer is consuming from topic1 at the source Kafka > > > > > broker. When MM2 mirrors this topic using options > > > > > source.cluster.alias="", replication.policy.separator="", topic named > > > > > topic1 shows up at the target Kafka broker. > > > > > > > > > > When the consumer instance simply switches the bootstrap server to > > > > > start consuming from this topic1 at the target broker, the consumer > > > > > seems to start subscribing from the latest offset. I thought the > > > > > consumer offsets were also migrated to the target and the consumer > > > > > could simply start subscribing from the mirrored topic from the > > > > > continued offset. Did I miss some configuration parameters or does the > > > > > consumer need to perform some actions to be able to consume the > > > > > records seamlessly? > > > > > > > > > > I appreciate for your help. Thanks. > > > > > > > > > > regards, aki > > > > > >
Re: When using MM2, how should the consumer be switched from source to target?
Hi Ryanne, Thanks for the information regarding the manual translation approach. But for 2.7.0, I have a question. I thought this translation would happen implicitly. I saw test testOneWayReplicationWithAutoOffsetSync of MirrorConnectorsIntegrationTest that is supposed to test this behavior. But I have an impression that the consumers can't be migrated when there are some messages left in the original cluster. I posted this question to users@kafka a while ago but I didn't receive any responses. https://lists.apache.org/thread.html/r0728e9ea89b2713865a33396f108027ac3f0949c2496583532ee963c%40%3Cusers.kafka.apache.org%3E Could you or someone comment on this? Thanks. Regards, aki El sáb, 2 ene 2021 a las 20:31, Ryanne Dolan () escribió: > > Aki, that's right. Prior to 2.7, you can use the translateOffsets method > from within your client code, or you can write a little command-line tool > to do it. I've done the latter for Cloudera's srm-control tool, as > documented here: > https://docs.cloudera.com/csp/2.0.1/srm-using/topics/srm-migrating-consumer-groups.html > > Ryanne > > On Thu, Dec 17, 2020, 5:23 PM Aki Yoshida wrote: > > > I'm answering to my own mail and would like to hear if this assumption > > is correct. > > it looks like I need 2.7.0 to have the automatic translation. > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 > > for earlier versions, there is no way to use the console client but > > use java API RemoteClusterUtils.translateOffsets() to get the offset > > and re-consuming from the new topic. > > Is this correct? > > > > El jue, 17 dic 2020 a las 21:18, Aki Yoshida () > > escribió: > > > > > > Hi, > > > I have a question regarding how to migrate a consumer when the > > > subscribing topic has been migrated to the target Kafka broker. > > > Suppose a consumer is consuming from topic1 at the source Kafka > > > broker. When MM2 mirrors this topic using options > > > source.cluster.alias="", replication.policy.separator="", topic named > > > topic1 shows up at the target Kafka broker. > > > > > > When the consumer instance simply switches the bootstrap server to > > > start consuming from this topic1 at the target broker, the consumer > > > seems to start subscribing from the latest offset. I thought the > > > consumer offsets were also migrated to the target and the consumer > > > could simply start subscribing from the mirrored topic from the > > > continued offset. Did I miss some configuration parameters or does the > > > consumer need to perform some actions to be able to consume the > > > records seamlessly? > > > > > > I appreciate for your help. Thanks. > > > > > > regards, aki > >
Re: --override option for bin/connect-distributed.sh
Hi Tom, thank for your suggestion. I'll follow the KIP process. regards, aki El mar, 22 dic 2020 a las 10:18, Tom Bentley () escribió: > > Hi Aki, > > Since this is a change to a public API of the project it would need to be > done through the KIP process [1]. Since writing the KIP in this case isn't > much work, I suggest you write it up as a proposal and start a KIP > discussion thread on the dev@ mailing list, then interested people can > comment there. > > Kind regards, > > Tom > > [1]: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > > On Mon, Dec 21, 2020 at 8:57 PM Aki Yoshida wrote: > > > Hi Kafka team, > > I think the --override option of Kafka is very practical in starting > > Kafka for various situations without changing the properties file. I > > missed this feature in Kafka-Connect and I wanted to have it, so I > > created a patch in this commit in my forked repo. > > > > https://github.com/elakito/kafka/commit/1e54536598d1ce328d0aee10edb728270cc04af1 > > > > Could someone tell me if this is a good idea or a bad idea? If bad, is > > there some alternative way to customise the properties? If good, can I > > create a PR? > > I would appreciate for your suggestion. > > Thanks. > > regards, aki > > > >
--override option for bin/connect-distributed.sh
Hi Kafka team, I think the --override option of Kafka is very practical in starting Kafka for various situations without changing the properties file. I missed this feature in Kafka-Connect and I wanted to have it, so I created a patch in this commit in my forked repo. https://github.com/elakito/kafka/commit/1e54536598d1ce328d0aee10edb728270cc04af1 Could someone tell me if this is a good idea or a bad idea? If bad, is there some alternative way to customise the properties? If good, can I create a PR? I would appreciate for your suggestion. Thanks. regards, aki
MirrorConnectorsIntegrationTest's testOneWayReplicationWithAutoOffsetSync
Hi, I am new to MirrorMaker and I have been trying to understand its behavior, in particular about its consumer offset synchronization. I saw KIP-545 https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0, which talks about this feature implemented in 2.7.0. I thought this feature would allow a consumer to be able to switch from the source cluster and to the mirrored cluster to continue consuming records. I tried to test this feature with 2.7.0 but was unsuccessful. So, I looked at testOneWayReplicationWithAutoOffsetSync of MirrorConnectorsIntegrationTest which seems to test this offset sychronization feature. https://github.com/apache/kafka/blob/trunk/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java#L453 But I see that in this test, the consumer is switched to the new cluster when its offset is already set at "latest" and the assertion is only verifying the consumer to be at the latest offset (i.e., no records are consumed) at the new cluster when it is switched. https://github.com/apache/kafka/blob/trunk/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java#L496 This doesn't distinguish the cases between whether the offset is synchronized or the consumer started at the new cluster with "latest", no? So I modified this test to make the consumer's offset to have some lag when it is switched so that the consumer can consume those remaining records from the new cluster, as in this change https://github.com/elakito/kafka/commit/9063db0f8c4a53f5d9764612af898981d499a7b7 But this test failed. So, I am confused. Is my test wrong or do I misunderstand what KIP-545 is supposed to provide? Any help to understand this matter would be greatly appreciated. Regards, aki
Re: When using MM2, how should the consumer be switched from source to target?
I'm answering to my own mail and would like to hear if this assumption is correct. it looks like I need 2.7.0 to have the automatic translation. https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 for earlier versions, there is no way to use the console client but use java API RemoteClusterUtils.translateOffsets() to get the offset and re-consuming from the new topic. Is this correct? El jue, 17 dic 2020 a las 21:18, Aki Yoshida () escribió: > > Hi, > I have a question regarding how to migrate a consumer when the > subscribing topic has been migrated to the target Kafka broker. > Suppose a consumer is consuming from topic1 at the source Kafka > broker. When MM2 mirrors this topic using options > source.cluster.alias="", replication.policy.separator="", topic named > topic1 shows up at the target Kafka broker. > > When the consumer instance simply switches the bootstrap server to > start consuming from this topic1 at the target broker, the consumer > seems to start subscribing from the latest offset. I thought the > consumer offsets were also migrated to the target and the consumer > could simply start subscribing from the mirrored topic from the > continued offset. Did I miss some configuration parameters or does the > consumer need to perform some actions to be able to consume the > records seamlessly? > > I appreciate for your help. Thanks. > > regards, aki
When using MM2, how should the consumer be switched from source to target?
Hi, I have a question regarding how to migrate a consumer when the subscribing topic has been migrated to the target Kafka broker. Suppose a consumer is consuming from topic1 at the source Kafka broker. When MM2 mirrors this topic using options source.cluster.alias="", replication.policy.separator="", topic named topic1 shows up at the target Kafka broker. When the consumer instance simply switches the bootstrap server to start consuming from this topic1 at the target broker, the consumer seems to start subscribing from the latest offset. I thought the consumer offsets were also migrated to the target and the consumer could simply start subscribing from the mirrored topic from the continued offset. Did I miss some configuration parameters or does the consumer need to perform some actions to be able to consume the records seamlessly? I appreciate for your help. Thanks. regards, aki
Using FileConfigProvider with restricted path?
Hi, I am wondering if there is a possibility to restrict the path used for the FileConfigProvider's value in the configuration e.g., ${file:/xxxpath/xxxfile.properties:xxxname}. I am looking for a way to restrict this path. Thanks regards, aki
Re: swagger/openapi spec for Kafka-Connect REST API?
I know that there is a documentation for humans. But I needed a machine readable spec so that I could send requests from somewhere without a specific client library. So, I created a spec document from the jaxrs resources used by Kafka Connector. If there are interests in the community, I can attach the file to a GitHub issue or place it somewhere. El vie., 22 may. 2020 a las 15:06, Jun Wang () escribió: > > I am not aware of any swagger api spec for Kafka connect reset api. > But following 2 resources have enough information for me. > > https://kafka.apache.org/documentation/#connect_rest > https://docs.confluent.io/current/connect/references/restapi.html > > > > From: Miguel Silvestre > Sent: Friday, May 22, 2020 8:06 AM > To: users@kafka.apache.org > Subject: Re: swagger/openapi spec for Kafka-Connect REST API? > > Hi, > > I'm also searching for this. If you find it, please let me know. > Thank you. > -- > Miguel Silvestre > > > On Mon, May 18, 2020 at 8:59 AM Aki Yoshida wrote: > > > I am looking for the openapi spec for Kafka-Connect API but I couldn't > > find one. If it is available somewhere, could someone tell me where I > > can find it? > > > > regards, aki > >
swagger/openapi spec for Kafka-Connect REST API?
I am looking for the openapi spec for Kafka-Connect API but I couldn't find one. If it is available somewhere, could someone tell me where I can find it? regards, aki
Re: question about property advertised.listeners
Hi Kaufman, I wasn't thinking of using listeners to switch the running port itself. That makes it very simple. It was stupid of me. thank you. regards, aki 2016-11-30 17:43 GMT+01:00 Kaufman Ng : > The advertised.listeners property would not help you to map a different > port. If you need to talk to brokers at a different port you should also > set the listeners property accordingly. Say if you want the broker to > listen at port 10092 as an example, the server.properties needs to contain > these 2 lines: > > listeners=PLAINTEXT://:10092 > advertised.listeners=PLAINTEXT://:10092 > > Since you mentioned containers, are you using the Confluent's docker > containers? If yes, the above configs should be taken care of (via an > environment variable). > > Hope this helps. > > On Tue, Nov 29, 2016 at 9:56 AM, Aki Yoshida wrote: > >> I have a question regarding how to use advertised.listeners to expose >> a different port to the outside of the container where the broker is >> running. >> >> When using advertised.listeners to register a different port as the >> actual broker port, do we need a port forwarding from this configured >> port to the actual broker port within the container so that the broker >> itself can also find itself, right? >> >> thanks. >> regards, aki >> > > > > -- > Kaufman Ng > Solutions Architect | Confluent > +1 646 961 8063 | @kaufmanng > www.confluent.io
Looking for some info on kafka homepage
Hi, I am looking for the following information on Kafka's project home page but am not able to find them. The home page seems to have been updated recently (looks really nice ;-) and maybe I just don't know where to navigate through. The information that I would like to find are: - List of Kafka relevant CVEs (I saw the security page at https://kafka.apache.org/project-security by via "Project Info" -> "Learn about Kafka security", but this page gives only some general info) - EOL and planned EOL info about Kafka versions (I don't have any clue where to find this info) Could someone point me to the pages or if no such page is available, could we make one? Thanks regards, aki
question about property advertised.listeners
I have a question regarding how to use advertised.listeners to expose a different port to the outside of the container where the broker is running. When using advertised.listeners to register a different port as the actual broker port, do we need a port forwarding from this configured port to the actual broker port within the container so that the broker itself can also find itself, right? thanks. regards, aki
ZK path name mentioned in the online Replication Design document seems to be incorrect
I am trying to understand the replication procedure and saw this document https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 describing: LeaderAndISR path: stores leader and ISR of a partition /brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}} However, the correct path seems to have been /brokers/topics/[topic]/partitions/[partition_id]/state and returning {controller_epoch:epoc, leader:broker_id, version:version_id, leader_epoch:epoc, isr:[broker_id, ...]} I suppose this document was not updated when the path changed? regards, aki