Question about MM2's source-cluster.checkpoints.internal topic

2022-05-12 Thread Aki Yoshida
Hi,
I am using MM2 version 3.1.0 for a uni-directional mirroring of
source-cluster -> target-cluster.
I am confused with the values found in the
source-cluster.checkpoints.internal topic at the target cluster when
the source's earliest offset is not 0.

In my test setup, I prepared a topic samples.greetings with 10 records
at the source cluster and using consumer-group greeting10 to consume
those 10 records at the source cluster first. After that, I deleted 5
records from this topic to move the earliest offset to 5 and
subsequently produced 3 additional records to this topic.

Next, I verified consumer group greetings's state at the source
cluster using kafka-consumer-group.sh, which showed the offset 10 and
the lag 3, as expected.
GROUP  TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAG   CONSUMER-ID   HOST  CLIENT-ID
greetings  samples.greetings  0  10  13
  3 - - -

Next, I started the MM2 to mirror this topic to the target cluster.

After the mirroring is complete, I used
RemoteClusterUtils.translateOffsets to get the translated offset of
this consumer group. This returned 10 instead of 5. Then, I also
looked into topic source-cluster.checkpoints.internal at the target
cluster. I was expecting to find records with {upstreamOffset:10,
downstreamOffset:5}, but I found the records with {upstreamOffset:10,
downstreamOffset:10}.

And consumer group greetings's state at the target cluster was shown
by kafka-consumer-group.sh as

GROUP  TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAG   CONSUMER-ID   HOST  CLIENT-ID
greetings  samples.greetings  0  10  8
  -2- - -

I thought the checkpoint topic would have the translated
downstreamOffset of 5 and the consumer offset at the target cluster
was translated to 5. This would have allowed the consumer to simply
switch to the target cluster. Therefore, I am wondering if this is the
expected behavior or something went wrong.

To be able to simply switch the consumer to the target cluster, I
could reset the current offset at the target cluster to the correct
offset by comparing the LAG values at the source and target clusters
and setting the current offset by subtracting the difference (assuming
the mirroring is stopped so that the updated offset won't be
overwritten) from the current offset value. But this is a bit of pain.

Thank you for any help.
Regards, aki


Re: When using MM2, how should the consumer be switched from source to target?

2021-01-13 Thread Aki Yoshida
Hi Ning,
I mean by "when there are some messages left in the original cluster",
when the consumer switches from the original cluster to the new
cluster before entirely consuming the messages at the original
cluster.
This commit illustrates this situation.
https://github.com/elakito/kafka/commit/9063db0f8c4a53f5d9764612af898981d499a7b7

regards, aki

El mié, 13 ene 2021 a las 3:17, Ning Zhang () escribió:
>
> Hello Aki,
>
> Can you elaborate on "when there are some messages left in the original 
> cluster" ?
>
> On 2021/01/12 13:01:40, Aki Yoshida  wrote:
> > Hi Ryanne,
> > Thanks for the information regarding the manual translation approach.
> > But for 2.7.0, I have a question. I thought this translation would
> > happen implicitly.
> > I saw test testOneWayReplicationWithAutoOffsetSync of
> > MirrorConnectorsIntegrationTest that is supposed to test this
> > behavior. But I have an impression that the consumers can't be
> > migrated when there are some messages left in the original cluster. I
> > posted this question to users@kafka a while ago but I didn't receive
> > any responses.
> > https://lists.apache.org/thread.html/r0728e9ea89b2713865a33396f108027ac3f0949c2496583532ee963c%40%3Cusers.kafka.apache.org%3E
> > Could you or someone comment on this?
> > Thanks.
> > Regards, aki
> >
> > El sáb, 2 ene 2021 a las 20:31, Ryanne Dolan () 
> > escribió:
> > >
> > > Aki, that's right. Prior to 2.7, you can use the translateOffsets method
> > > from within your client code, or you can write a little command-line tool
> > > to do it. I've done the latter for Cloudera's srm-control tool, as
> > > documented here:
> > > https://docs.cloudera.com/csp/2.0.1/srm-using/topics/srm-migrating-consumer-groups.html
> > >
> > > Ryanne
> > >
> > > On Thu, Dec 17, 2020, 5:23 PM Aki Yoshida  wrote:
> > >
> > > > I'm answering to my own mail and would like to hear if this assumption
> > > > is correct.
> > > > it looks like I need 2.7.0 to have the automatic translation.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> > > > for earlier versions, there is no way to use the console client but
> > > > use java API RemoteClusterUtils.translateOffsets() to get the offset
> > > > and re-consuming from the new topic.
> > > > Is this correct?
> > > >
> > > > El jue, 17 dic 2020 a las 21:18, Aki Yoshida ()
> > > > escribió:
> > > > >
> > > > > Hi,
> > > > > I have a question regarding how to migrate a consumer when the
> > > > > subscribing topic has been migrated to the target Kafka broker.
> > > > > Suppose a consumer is consuming from topic1 at the source Kafka
> > > > > broker. When MM2 mirrors this topic using options
> > > > > source.cluster.alias="", replication.policy.separator="", topic named
> > > > > topic1 shows up at the target Kafka broker.
> > > > >
> > > > > When the consumer instance simply switches the bootstrap server to
> > > > > start consuming from this topic1 at the target broker, the consumer
> > > > > seems to start subscribing from the latest offset. I thought the
> > > > > consumer offsets were also migrated to the target and the consumer
> > > > > could simply start subscribing from the mirrored topic from the
> > > > > continued offset. Did I miss some configuration parameters or does the
> > > > > consumer need to perform some actions to be able to consume the
> > > > > records seamlessly?
> > > > >
> > > > > I appreciate for your help. Thanks.
> > > > >
> > > > > regards, aki
> > > >
> >


Re: When using MM2, how should the consumer be switched from source to target?

2021-01-12 Thread Aki Yoshida
Hi Ryanne,
Thanks for the information regarding the manual translation approach.
But for 2.7.0, I have a question. I thought this translation would
happen implicitly.
I saw test testOneWayReplicationWithAutoOffsetSync of
MirrorConnectorsIntegrationTest that is supposed to test this
behavior. But I have an impression that the consumers can't be
migrated when there are some messages left in the original cluster. I
posted this question to users@kafka a while ago but I didn't receive
any responses.
https://lists.apache.org/thread.html/r0728e9ea89b2713865a33396f108027ac3f0949c2496583532ee963c%40%3Cusers.kafka.apache.org%3E
Could you or someone comment on this?
Thanks.
Regards, aki

El sáb, 2 ene 2021 a las 20:31, Ryanne Dolan () escribió:
>
> Aki, that's right. Prior to 2.7, you can use the translateOffsets method
> from within your client code, or you can write a little command-line tool
> to do it. I've done the latter for Cloudera's srm-control tool, as
> documented here:
> https://docs.cloudera.com/csp/2.0.1/srm-using/topics/srm-migrating-consumer-groups.html
>
> Ryanne
>
> On Thu, Dec 17, 2020, 5:23 PM Aki Yoshida  wrote:
>
> > I'm answering to my own mail and would like to hear if this assumption
> > is correct.
> > it looks like I need 2.7.0 to have the automatic translation.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> > for earlier versions, there is no way to use the console client but
> > use java API RemoteClusterUtils.translateOffsets() to get the offset
> > and re-consuming from the new topic.
> > Is this correct?
> >
> > El jue, 17 dic 2020 a las 21:18, Aki Yoshida ()
> > escribió:
> > >
> > > Hi,
> > > I have a question regarding how to migrate a consumer when the
> > > subscribing topic has been migrated to the target Kafka broker.
> > > Suppose a consumer is consuming from topic1 at the source Kafka
> > > broker. When MM2 mirrors this topic using options
> > > source.cluster.alias="", replication.policy.separator="", topic named
> > > topic1 shows up at the target Kafka broker.
> > >
> > > When the consumer instance simply switches the bootstrap server to
> > > start consuming from this topic1 at the target broker, the consumer
> > > seems to start subscribing from the latest offset. I thought the
> > > consumer offsets were also migrated to the target and the consumer
> > > could simply start subscribing from the mirrored topic from the
> > > continued offset. Did I miss some configuration parameters or does the
> > > consumer need to perform some actions to be able to consume the
> > > records seamlessly?
> > >
> > > I appreciate for your help. Thanks.
> > >
> > > regards, aki
> >


Re: --override option for bin/connect-distributed.sh

2020-12-22 Thread Aki Yoshida
Hi Tom,
thank for your suggestion. I'll follow the KIP process.
regards, aki

El mar, 22 dic 2020 a las 10:18, Tom Bentley () escribió:
>
> Hi Aki,
>
> Since this is a change to a public API of the project it would need to be
> done through the KIP process [1]. Since writing the KIP in this case isn't
> much work, I suggest you write it up as a proposal and start a KIP
> discussion thread on the dev@ mailing list, then interested people can
> comment there.
>
> Kind regards,
>
> Tom
>
> [1]:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> On Mon, Dec 21, 2020 at 8:57 PM Aki Yoshida  wrote:
>
> > Hi Kafka team,
> > I think the --override option of Kafka is very practical in starting
> > Kafka for various situations without changing the properties file. I
> > missed this feature in Kafka-Connect and I wanted to have it, so I
> > created a patch in this commit in my forked repo.
> >
> > https://github.com/elakito/kafka/commit/1e54536598d1ce328d0aee10edb728270cc04af1
> >
> > Could someone tell me if this is a good idea or a bad idea? If bad, is
> > there some alternative way to customise the properties? If good, can I
> > create a PR?
> > I would appreciate for your suggestion.
> > Thanks.
> > regards, aki
> >
> >


--override option for bin/connect-distributed.sh

2020-12-21 Thread Aki Yoshida
Hi Kafka team,
I think the --override option of Kafka is very practical in starting
Kafka for various situations without changing the properties file. I
missed this feature in Kafka-Connect and I wanted to have it, so I
created a patch in this commit in my forked repo.
https://github.com/elakito/kafka/commit/1e54536598d1ce328d0aee10edb728270cc04af1

Could someone tell me if this is a good idea or a bad idea? If bad, is
there some alternative way to customise the properties? If good, can I
create a PR?
I would appreciate for your suggestion.
Thanks.
regards, aki


MirrorConnectorsIntegrationTest's testOneWayReplicationWithAutoOffsetSync

2020-12-18 Thread Aki Yoshida
Hi,
I am new to MirrorMaker and I have been trying to understand its
behavior, in particular about its consumer offset synchronization.

I saw KIP-545 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0,
which talks about this feature implemented in 2.7.0. I thought this
feature would allow a consumer to be able to switch from the source
cluster and to the mirrored cluster to continue consuming records.

I tried to test this feature with 2.7.0 but was unsuccessful. So, I
looked at testOneWayReplicationWithAutoOffsetSync of
MirrorConnectorsIntegrationTest which seems to test this offset
sychronization feature.
https://github.com/apache/kafka/blob/trunk/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java#L453

But I see that in this test, the consumer is switched to the new
cluster when its offset is already set at "latest" and the assertion
is only verifying the consumer to be at the latest offset (i.e., no
records are consumed) at the new cluster when it is switched.
https://github.com/apache/kafka/blob/trunk/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java#L496

This doesn't distinguish the cases between whether the offset is
synchronized or the consumer started at the new cluster with "latest",
no?

So I modified this test to make the consumer's offset to have some lag
when it is switched so that the consumer can consume those remaining
records from the new cluster, as in this change

https://github.com/elakito/kafka/commit/9063db0f8c4a53f5d9764612af898981d499a7b7

But this test failed. So, I am confused. Is my test wrong or do I
misunderstand what KIP-545 is supposed to provide? Any help to
understand this matter would be greatly appreciated.

Regards, aki


Re: When using MM2, how should the consumer be switched from source to target?

2020-12-17 Thread Aki Yoshida
I'm answering to my own mail and would like to hear if this assumption
is correct.
it looks like I need 2.7.0 to have the automatic translation.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
for earlier versions, there is no way to use the console client but
use java API RemoteClusterUtils.translateOffsets() to get the offset
and re-consuming from the new topic.
Is this correct?

El jue, 17 dic 2020 a las 21:18, Aki Yoshida () escribió:
>
> Hi,
> I have a question regarding how to migrate a consumer when the
> subscribing topic has been migrated to the target Kafka broker.
> Suppose a consumer is consuming from topic1 at the source Kafka
> broker. When MM2 mirrors this topic using options
> source.cluster.alias="", replication.policy.separator="", topic named
> topic1 shows up at the target Kafka broker.
>
> When the consumer instance simply switches the bootstrap server to
> start consuming from this topic1 at the target broker, the consumer
> seems to start subscribing from the latest offset. I thought the
> consumer offsets were also migrated to the target and the consumer
> could simply start subscribing from the mirrored topic from the
> continued offset. Did I miss some configuration parameters or does the
> consumer need to perform some actions to be able to consume the
> records seamlessly?
>
> I appreciate for your help. Thanks.
>
> regards, aki


When using MM2, how should the consumer be switched from source to target?

2020-12-17 Thread Aki Yoshida
Hi,
I have a question regarding how to migrate a consumer when the
subscribing topic has been migrated to the target Kafka broker.
Suppose a consumer is consuming from topic1 at the source Kafka
broker. When MM2 mirrors this topic using options
source.cluster.alias="", replication.policy.separator="", topic named
topic1 shows up at the target Kafka broker.

When the consumer instance simply switches the bootstrap server to
start consuming from this topic1 at the target broker, the consumer
seems to start subscribing from the latest offset. I thought the
consumer offsets were also migrated to the target and the consumer
could simply start subscribing from the mirrored topic from the
continued offset. Did I miss some configuration parameters or does the
consumer need to perform some actions to be able to consume the
records seamlessly?

I appreciate for your help. Thanks.

regards, aki


Using FileConfigProvider with restricted path?

2020-12-11 Thread Aki Yoshida
Hi,
I am wondering if there is a possibility to restrict the path used for
the FileConfigProvider's value in the configuration e.g.,
${file:/xxxpath/xxxfile.properties:xxxname}.

I am looking for a way to restrict this path.

Thanks
regards, aki


Re: swagger/openapi spec for Kafka-Connect REST API?

2020-05-26 Thread Aki Yoshida
I know that there is a documentation for humans. But I needed a
machine readable spec so that I could send requests from somewhere
without a specific client library.  So, I created a spec document from
the jaxrs resources used by Kafka Connector. If there are interests in
the community, I can attach the file to a GitHub issue or place it
somewhere.

El vie., 22 may. 2020 a las 15:06, Jun Wang () escribió:
>
> I am not aware of any swagger api spec for Kafka connect reset api.
> But following 2 resources have enough information for me.
>
> https://kafka.apache.org/documentation/#connect_rest
> https://docs.confluent.io/current/connect/references/restapi.html
>
>
> 
> From: Miguel Silvestre 
> Sent: Friday, May 22, 2020 8:06 AM
> To: users@kafka.apache.org 
> Subject: Re: swagger/openapi spec for Kafka-Connect REST API?
>
> Hi,
>
> I'm also searching for this. If you find it, please let me know.
> Thank you.
> --
> Miguel Silvestre
>
>
> On Mon, May 18, 2020 at 8:59 AM Aki Yoshida  wrote:
>
> > I am looking for the openapi spec for Kafka-Connect API but I couldn't
> > find one. If it is available somewhere, could someone tell me where I
> > can find it?
> >
> > regards, aki
> >


swagger/openapi spec for Kafka-Connect REST API?

2020-05-18 Thread Aki Yoshida
I am looking for the openapi spec for Kafka-Connect API but I couldn't
find one. If it is available somewhere, could someone tell me where I
can find it?

regards, aki


Re: question about property advertised.listeners

2016-12-23 Thread Aki Yoshida
Hi Kaufman,
I wasn't thinking of using listeners to switch the running port
itself. That makes it very simple. It was stupid of me.
thank you.
regards, aki

2016-11-30 17:43 GMT+01:00 Kaufman Ng :
> The advertised.listeners property would not help you to map a different
> port.  If you need to talk to brokers at a different port you should also
> set the listeners property accordingly.  Say if you want the broker to
> listen at port 10092 as an example, the server.properties needs to contain
> these 2 lines:
>
> listeners=PLAINTEXT://:10092
> advertised.listeners=PLAINTEXT://:10092
>
> Since you mentioned containers, are you using the Confluent's docker
> containers?  If yes, the above configs should be taken care of (via an
> environment variable).
>
> Hope this helps.
>
> On Tue, Nov 29, 2016 at 9:56 AM, Aki Yoshida  wrote:
>
>> I have a question regarding how to use advertised.listeners to expose
>> a different port to the outside of the container where the broker is
>> running.
>>
>> When using advertised.listeners to register a different port as the
>> actual broker port, do we need a port forwarding from this configured
>> port to the actual broker port within the container so that the broker
>> itself can also find itself, right?
>>
>> thanks.
>> regards, aki
>>
>
>
>
> --
> Kaufman Ng
> Solutions Architect | Confluent
> +1 646 961 8063 | @kaufmanng
> www.confluent.io


Looking for some info on kafka homepage

2016-12-12 Thread Aki Yoshida
Hi,
I am looking for the following information on Kafka's project home
page but am not able to find them. The home page seems to have been
updated recently (looks really nice ;-) and maybe I just don't know
where to navigate through.
The information that I would like to find are:

- List of Kafka relevant CVEs (I saw the security page at
https://kafka.apache.org/project-security by via "Project Info" ->
"Learn about Kafka security", but this page gives only some general
info)

- EOL and planned EOL info about Kafka versions (I don't have any clue
where to find this info)

Could someone point me to the pages or if no such page is available,
could we make one?

Thanks

regards, aki


question about property advertised.listeners

2016-11-29 Thread Aki Yoshida
I have a question regarding how to use advertised.listeners to expose
a different port to the outside of the container where the broker is
running.

When using advertised.listeners to register a different port as the
actual broker port, do we need a port forwarding from this configured
port to the actual broker port within the container so that the broker
itself can also find itself, right?

thanks.
regards, aki


ZK path name mentioned in the online Replication Design document seems to be incorrect

2016-05-10 Thread Aki Yoshida
I am trying to understand the replication procedure and saw this document
https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
describing:

LeaderAndISR path: stores leader and ISR of a partition

/brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc:
epoc, leader: broker_id, ISR: {broker1, broker2}}

However, the correct path seems to have been
/brokers/topics/[topic]/partitions/[partition_id]/state

and returning
{controller_epoch:epoc, leader:broker_id, version:version_id,
leader_epoch:epoc, isr:[broker_id, ...]}

I suppose this document was not updated when the path changed?

regards, aki