Re: Kafka mTLS authentication

2021-11-22 Thread Luke Chen
Hi Yingjie,
> However,  I meet a problem.  If I need to add, remove or renew the
certificate to Kafka’s truststore, Kafka requires a reboot which would
impact the service available for other teams.

> So I want to know if there is a better way to support the change of
Kafka’s
certificate without impacting the service availability?

Yes, Kafka supports dynamically updating broker's configuration. Please
check here: https://kafka.apache.org/documentation/#dynamicbrokerconfigs ,
there's a section talking about "Updating SSL Truststore of an Existing
Listener", which should be what you're looking for.

Good luck.

Thank you.
Luke

On Tue, Nov 23, 2021 at 1:12 PM yingjie zou  wrote:

> Hi,
>
> Currently, we are going to provide Kafka services to 20+ development teams
> in my company, we’d like to provide that as multi-tenancy - the different
> team has different authentication. And we try to use the Kafka mTLS
> solution.
>
> However,  I meet a problem.  If I need to add, remove or renew the
> certificate to Kafka’s truststore, Kafka requires a reboot which would
> impact the service available for other teams.
>
> So I want to know if there is a better way to support the change of Kafka’s
> certificate without impacting the service availability?
>
> Any help is appreciated.
>
> Thanks.
> Yingjie Zou
>


Re: EOL clarification

2021-11-22 Thread Luke Chen
Hi Piyush,
Sorry for the late reply.
Yes, there is a high possibility that there won't be any support for 2.x
release unleash high severity issue, after v3.1 released.

Thank you.

On Tue, Nov 16, 2021 at 7:05 PM Piyush Mittal  wrote:

> Hi Luke,
>
> One small clarification with regards to 2 most recent releases - Once v3.1
> is released, there won't be any support for 2.x.x unless special case like
> 2.6.3 https://lists.apache.org/thread/rj82rxch4tz19tjxj70v4o9kb7hsnrhb and
> teams should plan for major upgrade from 2.x.x to 3.x.x for next calendar
> year. Is that correct?
>
> Thanks and Regards
> Piyush Mittal
>
>
> On Tue, Nov 16, 2021 at 1:41 PM Luke Chen  wrote:
>
> > Hi Piyush,
> > > As an example, let's say I am on 2.4.1 right now. If it's receiving
> > security and bug fixes then I don't see any need to upgrade to latest
> > version 3.0 or even 2.8.1
> > No, I don't think 2.4.1 will have any chance to have further release.
> > You can check this mail thread, to know why there's a 2.6.3 release:
> > https://lists.apache.org/thread/rj82rxch4tz19tjxj70v4o9kb7hsnrhb
> >
> > Thank you.
> > Luke
> >
> > On Tue, Nov 16, 2021 at 3:42 PM Piyush Mittal 
> > wrote:
> >
> > > Hi Luke,
> > >
> > > >Kafka only produces releases for the 2 most recent branches.
> > > EOL policy on wiki
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy
> > > ?>
> > > says three releases. Can we correct wiki?
> > >
> > > >you'll also find some releases have minor version to 2, ex: 2.6.2 (and
> > > there's going to be 2.6.3 released due to CVE)
> > > If only two versions are supported then 2.6.3 shouldn't be released?
> > Maybe
> > > I misunderstood something here. What I am looking for is whether a
> given
> > > version is supported (receive security and bug fixes) or not? As an
> > > example, let's say I am on 2.4.1 right now. If it's receiving security
> > and
> > > bug fixes then I don't see any need to upgrade to latest version 3.0 or
> > > even 2.8.1
> > >
> > > Thanks and Regards
> > > Piyush Mittal
> > >
> > >
> > > On Tue, Nov 16, 2021 at 12:38 PM Luke Chen  wrote:
> > >
> > > > Hi Piyush,
> > > > Usually, Kafka only produces releases for the 2 most recent branches.
> > Ex:
> > > > We're going to release V3.1, and some fix will also back port to V3.0
> > > > branch.
> > > > So, from the download page, you can see most minor version is only up
> > to
> > > 1,
> > > > ex: 2.3.1, 2.5.1...
> > > > But that's not a strict rule, so you'll also find some releases have
> > > minor
> > > > version to 2, ex: 2.6.2 (and there's going to be 2.6.3 released due
> to
> > > CVE)
> > > >
> > > > If you want to know a minimal supported version, I'd say, 2 versions.
> > > (ex:
> > > > v3.1, v3.0)
> > > >
> > > > Please correct me if I'm wrong.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Mon, Nov 15, 2021 at 1:44 PM Piyush Mittal  >
> > > > wrote:
> > > >
> > > > > >
> > > > > > Hello Team,
> > > > > >
> > > > > > How can I find whether a given Kafka version is supported/EOL or
> > > not? I
> > > > > > checked Archives available here
> > > > > >  and
> > > didn't
> > > > > > find anything related to EOL. I found an EOL policy here
> > > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy
> > > > > ?>
> > > > > > but it's not very clear. Does last 3 versions include major
> > version?
> > > > > >
> > > > > > It would be great if the team could add EOL metadata with each
> > > version
> > > > > > displayed on the download page.
> > > > > >
> > > > > > Thanks and Regards
> > > > > > Piyush Mittal
> > > > > >
> > > > >
> > > >
> > >
> >
>


Kafka mTLS authentication

2021-11-22 Thread yingjie zou
Hi,

Currently, we are going to provide Kafka services to 20+ development teams
in my company, we’d like to provide that as multi-tenancy - the different
team has different authentication. And we try to use the Kafka mTLS
solution.

However,  I meet a problem.  If I need to add, remove or renew the
certificate to Kafka’s truststore, Kafka requires a reboot which would
impact the service available for other teams.

So I want to know if there is a better way to support the change of Kafka’s
certificate without impacting the service availability?

Any help is appreciated.

Thanks.
Yingjie Zou


Re: delete the topic, and immediately create the topic

2021-11-22 Thread Luke Chen
Hi 志恒,
> When I use AdminClient to delete the topic, and immediately create the
topic that was just deleted, it always indicates that the topic already
exists
In Kafka, the topics info (metadata) is stored in cache, and will not be
updated immediately. That's why you saw this situation.
I don't think delete a topic and recreate the same one immediately is a
good idea.
Maybe you should change your codes accordingly.

Thank you.
Luke

On Mon, Nov 22, 2021 at 11:30 PM 王 志恒  wrote:

> Hi,
> When I use AdminClient to delete the topic, and immediately create the
> topic that was just deleted, it always indicates that the topic already
> exists, but I try to get all the existing topics, the topic does not exist.
> Following is the code I tested.
>
> package org.wzh.three2.kafka.producer;
>
> import org.apache.kafka.clients.admin.*;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> import java.util.Collections;
> import java.util.Properties;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.Future;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.locks.LockSupport;
>
> public class ProducerClient {
>
> public static final String TOPIC = "topic-three2-kafka";
>
> public static void main(String[] args) throws ExecutionException,
> InterruptedException {
> initTopic();
>
> //KafkaProducer producer = new
> KafkaProducer<>(initConfig());
> //for (int i = 0; i < Integer.MAX_VALUE; i++) {
> //ProducerRecord record = new
> ProducerRecord<>(TOPIC, String.valueOf(i));
> //try {
> //Future future = producer.send(record);
> //RecordMetadata metadata = future.get();
> //System.out.println(metadata.topic() + " - " +
> metadata.partition() + ":" + metadata.offset());
> //} catch (InterruptedException e) {
> //e.printStackTrace();
> //} catch (ExecutionException e) {
> //e.printStackTrace();
> //}
> //LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
> //}
> }
>
> private static void listTopics(AdminClient client) throws
> ExecutionException, InterruptedException {
> System.out.println("+--Topics---+");
>
> client.listTopics().names().get().stream().forEach(System.out::println);
> System.out.println("+---+");
> }
>
> // 创建主题
> public static void initTopic() throws ExecutionException,
> InterruptedException {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
> AdminClient client = KafkaAdminClient.create(props);
>
> listTopics(client);
>
> NewTopic newTopic = new NewTopic(TOPIC, 10, (short) 3);
> if(client.listTopics().names().get().contains(TOPIC)) {
> System.out.println("Will delete topic: " + TOPIC);
> try {
> DeleteTopicsResult deleteTopicsResult =
> client.deleteTopics(Collections.singletonList(newTopic.name()));
> deleteTopicsResult.all().get();
> deleteTopicsResult.values().forEach((k, v) ->
> System.out.println(k + "\t" + v));
> } catch (InterruptedException e) {
> e.printStackTrace();
> } catch (ExecutionException e) {
> e.printStackTrace();
> }
> }
>
> listTopics(client);
>
> try {
> System.out.println("Will create topic: " + TOPIC);
> CreateTopicsResult result =
> client.createTopics(Collections.singletonList(newTopic));
> result.all().get();
> result.values().forEach((k, v) -> System.out.println(k + "\t"
> + v));
> } catch (InterruptedException e) {
> e.printStackTrace();
> } catch (ExecutionException e) {
> e.printStackTrace();
> }
> client.close();
> }
>
> public static Properties initConfig() {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> props.put(ProducerConfig.CLIENT_ID_CONFIG,
> "producer.client.id.demo");
> return props;
> }
> }
>
> Brs/newbie
> 从 Windows 版邮件发送
>
>


Re: Consumer failure after rolling Broker upgrade

2021-11-22 Thread Luke Chen
Hi James,
> Bouncing the clients resolved the issue
Could you please describe which version you upgrade to, to resolve this
issue? That should also help other users encountering the same issue.

And the code snippet you listed, existed since 2018, I don't think there is
any problem there.
Maybe there are bugs existed in other places, and got fixed indirectly.

Thank you.
Luke

On Tue, Nov 23, 2021 at 10:27 AM James Olsen  wrote:

> We had a 2.5.1 Broker/Client system running for some time with regular
> rolling OS upgrades to the Brokers without any problems.  A while ago we
> upgraded both Broker and Clients to 2.7.1 and now on the first rolling OS
> upgrade to the 2.7.1 Brokers we encountered some Consumer issues.  We have
> a 3 Broker setup with min-ISRs configured to avoid any outage.
>
> So maybe we just got lucky 6 times in a row with the 2.5.1 or maybe there
> is an issue with the 2.7.1.
>
> The observable symptom is a continuous stream of "The coordinator is not
> available" messages when trying to commit offsets.  It starts with the
> usual messages you might expect during a rolling upgrade...
>
> 2021-11-22 04:41:25,269 WARN
> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> 'pool-7-thread-132' [Consumer clientId=consumer-MyService-group-58,
> groupId=MyService-group] Offset commit failed on partition MyTopic-0 at
> offset 866799313: The coordinator is loading and hence can't process
> requests.
>
> ... then 5 minutes of all OK, then ...
>
> 2021-11-22 04:46:33,258 WARN
> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> 'pool-7-thread-132' [Consumer clientId=consumer-MyService-group-58,
> groupId=MyService-group] Offset commit failed on partition MyTopic-0 at
> offset 866803953: This is not the correct coordinator.
>
> 2021-11-22 04:46:33,258 INFO
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> 'pool-7-thread-132' [Consumer clientId=consumer-MyService-group-58,
> groupId=MyService-group] Group coordinator b-2.xxx.com:9094<
> http://b-2.xxx.com:9094> (id: 2147483645 rack: null) is unavailable or
> invalid due to cause: error response NOT_COORDINATOR.isDisconnected: false.
> Rediscovery will be attempted.
>
> 2021-11-22 04:46:33,258 WARN  [xxx.KafkaConsumerRunner]
> 'pool-7-thread-132' Offset commit with offsets
> {MyTopic-0=OffsetAndMetadata{offset=866803953, leaderEpoch=null,
> metadata=''}} failed:
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is
> not the correct coordinator.
>
> ... then the following message for every subsequent attempt to commit
> offsets ...
>
> 2021-11-22 04:46:33,284 WARN  [xxx.KafkaConsumerRunner]
> 'pool-7-thread-132' Offset commit with offsets
> {MyTopic-0=OffsetAndMetadata{offset=866803954, leaderEpoch=82,
> metadata=''}, MyOtherTopic-0=OffsetAndMetadata{offset=12654756,
> leaderEpoch=79, metadata=''}} failed:
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by:
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
> coordinator is not available.
>
> In the above example we are doing manual async-commits but we also had
> offset commit failure for a different consumer group (observed through lag
> monitoring) that uses auto-commit, it just didn't log the ongoing
> failures.  In both cases messages were still being processed, it was just
> the commits not working.  These are our two busiest consumer groups and
> both have static Topic assignments.  Other consumer groups continued OK.
>
> I've spent some time examining the (Java) client code and started to
> wonder whether there is a bug or race condition that means the coordinator
> never gets reassigned after being invalidated and we simply keep hitting
> the following short-circuit:
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
>
> RequestFuture sendOffsetCommitRequest(final Map OffsetAndMetadata> offsets) {
> if (offsets.isEmpty())
> return RequestFuture.voidSuccess();
>
> Node coordinator = checkAndGetCoordinator();
> if (coordinator == null)
> return RequestFuture.coordinatorNotAvailable();
>
> I'm not sure what the exact pathway is to getting the coordinator set but
> I note that
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(Timer)
> and other methods that look like they may be related tend to only log at
> debug when they encounter RetriableException so could explain why I don't
> have more detail to provide.
>
> I'm not familiar enough with the code to be able to trace this through any
> further, but if you've had the patience to keep reading this far then maybe
> you do!
>
> Bouncin

Consumer failure after rolling Broker upgrade

2021-11-22 Thread James Olsen
We had a 2.5.1 Broker/Client system running for some time with regular rolling 
OS upgrades to the Brokers without any problems.  A while ago we upgraded both 
Broker and Clients to 2.7.1 and now on the first rolling OS upgrade to the 
2.7.1 Brokers we encountered some Consumer issues.  We have a 3 Broker setup 
with min-ISRs configured to avoid any outage.

So maybe we just got lucky 6 times in a row with the 2.5.1 or maybe there is an 
issue with the 2.7.1.

The observable symptom is a continuous stream of "The coordinator is not 
available" messages when trying to commit offsets.  It starts with the usual 
messages you might expect during a rolling upgrade...

2021-11-22 04:41:25,269 WARN  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-7-thread-132' [Consumer clientId=consumer-MyService-group-58, 
groupId=MyService-group] Offset commit failed on partition MyTopic-0 at offset 
866799313: The coordinator is loading and hence can't process requests.

... then 5 minutes of all OK, then ...

2021-11-22 04:46:33,258 WARN  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-7-thread-132' [Consumer clientId=consumer-MyService-group-58, 
groupId=MyService-group] Offset commit failed on partition MyTopic-0 at offset 
866803953: This is not the correct coordinator.

2021-11-22 04:46:33,258 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 
'pool-7-thread-132' [Consumer clientId=consumer-MyService-group-58, 
groupId=MyService-group] Group coordinator 
b-2.xxx.com:9094 (id: 2147483645 rack: null) is 
unavailable or invalid due to cause: error response 
NOT_COORDINATOR.isDisconnected: false. Rediscovery will be attempted.

2021-11-22 04:46:33,258 WARN  [xxx.KafkaConsumerRunner] 'pool-7-thread-132' 
Offset commit with offsets {MyTopic-0=OffsetAndMetadata{offset=866803953, 
leaderEpoch=null, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not 
the correct coordinator.

... then the following message for every subsequent attempt to commit offsets 
...

2021-11-22 04:46:33,284 WARN  [xxx.KafkaConsumerRunner] 'pool-7-thread-132' 
Offset commit with offsets {MyTopic-0=OffsetAndMetadata{offset=866803954, 
leaderEpoch=82, metadata=''}, MyOtherTopic-0=OffsetAndMetadata{offset=12654756, 
leaderEpoch=79, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.

In the above example we are doing manual async-commits but we also had offset 
commit failure for a different consumer group (observed through lag monitoring) 
that uses auto-commit, it just didn't log the ongoing failures.  In both cases 
messages were still being processed, it was just the commits not working.  
These are our two busiest consumer groups and both have static Topic 
assignments.  Other consumer groups continued OK.

I've spent some time examining the (Java) client code and started to wonder 
whether there is a bug or race condition that means the coordinator never gets 
reassigned after being invalidated and we simply keep hitting the following 
short-circuit:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator

RequestFuture sendOffsetCommitRequest(final Map offsets) {
if (offsets.isEmpty())
return RequestFuture.voidSuccess();

Node coordinator = checkAndGetCoordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();

I'm not sure what the exact pathway is to getting the coordinator set but I 
note that 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(Timer)
 and other methods that look like they may be related tend to only log at debug 
when they encounter RetriableException so could explain why I don't have more 
detail to provide.

I'm not familiar enough with the code to be able to trace this through any 
further, but if you've had the patience to keep reading this far then maybe you 
do!

Bouncing the clients resolved the issue, but I'd be interested if any experts 
out there can identify if there is any weakness in the 2.7.1 version.

Regards, James.



Re: Pause/Restart a Kafka streams app

2021-11-22 Thread Matthias J. Sax
You can only close() the Kafka Streams client and create a new one to 
resume (offsets are committed on close() and thus would be picked up on 
restart).


Closing and restarting would result in rebalancing thought, so to really 
pause/resume you would need to close() all instances.



There is no API to pause()/resume() similar to what the KafkaConsumer 
offers.



-Matthias


On 11/22/21 2:10 PM, Miguel González wrote:

Hello there

Is it possible to pause/restart a Kafka streams app? I have only found this
discussion
https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ
about using map to call an external service and loop until some condition
completes

regards
- Miguel



Pause/Restart a Kafka streams app

2021-11-22 Thread Miguel González
Hello there

Is it possible to pause/restart a Kafka streams app? I have only found this
discussion
https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ
about using map to call an external service and loop until some condition
completes

regards
- Miguel


delete.retention.ms=0 impact on changelog topics

2021-11-22 Thread Gray, John
Hello! We are currently getting hit by 
https://issues.apache.org/jira/browse/KAFKA-8522. We've tried setting the 
delete.retention.ms to be very low, but it still doesn't allow our topic to be 
cleansed of deletes. Setting it to 0 appears to be the only solution.

Our suppress and window changelog topics have min.cleanable.dirty.ratio set to 
0.05 (the topic can get pretty big, which makes restores painfully slow), and a 
segment.ms of 12 hours to allow the topic to stay as small as possible without 
stressing the log cleaner tooo much.

With this background, my question it: will setting delete.retention.ms to 0 
cause any unforeseen side effects for our restoring windows/suppress stores? 
Would it be possible to consume a valid message and then miss the following 
tombstone with these settings? And then maybe somehow cause our suppress store 
to emit an old should-have-already-been-deleted record? Or is the only downside 
the extra work we are causing in the log cleaner?

Thank you!


delete the topic, and immediately create the topic

2021-11-22 Thread 王 志恒
Hi,
When I use AdminClient to delete the topic, and immediately create the topic 
that was just deleted, it always indicates that the topic already exists, but I 
try to get all the existing topics, the topic does not exist.
Following is the code I tested.

package org.wzh.three2.kafka.producer;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class ProducerClient {

public static final String TOPIC = "topic-three2-kafka";

public static void main(String[] args) throws ExecutionException, 
InterruptedException {
initTopic();

//KafkaProducer producer = new 
KafkaProducer<>(initConfig());
//for (int i = 0; i < Integer.MAX_VALUE; i++) {
//ProducerRecord record = new 
ProducerRecord<>(TOPIC, String.valueOf(i));
//try {
//Future future = producer.send(record);
//RecordMetadata metadata = future.get();
//System.out.println(metadata.topic() + " - " + 
metadata.partition() + ":" + metadata.offset());
//} catch (InterruptedException e) {
//e.printStackTrace();
//} catch (ExecutionException e) {
//e.printStackTrace();
//}
//LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
//}
}

private static void listTopics(AdminClient client) throws 
ExecutionException, InterruptedException {
System.out.println("+--Topics---+");
client.listTopics().names().get().stream().forEach(System.out::println);
System.out.println("+---+");
}

// 创建主题
public static void initTopic() throws ExecutionException, 
InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
AdminClient client = KafkaAdminClient.create(props);

listTopics(client);

NewTopic newTopic = new NewTopic(TOPIC, 10, (short) 3);
if(client.listTopics().names().get().contains(TOPIC)) {
System.out.println("Will delete topic: " + TOPIC);
try {
DeleteTopicsResult deleteTopicsResult = 
client.deleteTopics(Collections.singletonList(newTopic.name()));
deleteTopicsResult.all().get();
deleteTopicsResult.values().forEach((k, v) -> 
System.out.println(k + "\t" + v));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

listTopics(client);

try {
System.out.println("Will create topic: " + TOPIC);
CreateTopicsResult result = 
client.createTopics(Collections.singletonList(newTopic));
result.all().get();
result.values().forEach((k, v) -> System.out.println(k + "\t" + v));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
client.close();
}

public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
}

Brs/newbie
从 Windows 版邮件发送



Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-22 Thread Dave Klein
I’m sorry.  I misread your message.  I thought you were asking about increasing 
the number of partitions on a topic after there were keyed events in it.  

> On Nov 22, 2021, at 3:07 AM, Pushkar Deole  wrote:
> 
> Dave,
> 
> i am not sure i get your point... it is not about lesser partitions, the
> issue is about the duplicate hash caused by default partitioner for 2
> different string, which might be landing the 2 different keys into same
> partition
> 
>> On Sun, Nov 21, 2021 at 9:33 PM Dave Klein  wrote:
>> 
>> Another possibility, if you can pause processing, is to create a new topic
>> with the higher number of partitions, then consume from the beginning of
>> the old topic and produce to the new one. Then continue processing as
>> normal and all events will be in the correct partitions.
>> 
>> Regards,
>> Dave
>> 
 On Nov 21, 2021, at 7:38 AM, Pushkar Deole  wrote:
>>> 
>>> Thanks Luke, I am sure this problem would have been faced by many others
>>> before so would like to know if there are any existing custom algorithms
>>> that can be reused,
>>> 
>>> Note that we also have requirement to maintain key level ordering,  so
>> the
>>> custom partitioner should support that as well
>>> 
 On Sun, Nov 21, 2021, 18:29 Luke Chen  wrote:
 
 Hello Pushkar,
 Default distribution algorithm is by "hash(key) % partition_count", so
 there's possibility to have the uneven distribution you saw.
 
 Yes, there's a way to solve your problem: custom partitioner:
 
>> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
 
 You can check the partitioner javadoc here
 <
 
>> https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/Partitioner.html
> 
 for reference. You can see some examples from built-in partitioners, ex:
 
 
>> clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
 Basically, you want to focus on the "partition" method, to define your
>> own
 algorithm to distribute the keys based on the events, ex: key-1 ->
 partition-1, key-2 -> partition-2... etc.
 
 Thank you.
 Luke
 
 
 On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole 
 wrote:
 
> Hi All,
> 
> We are experiencing some uneven distribution of events across topic
> partitions for a small set of unique keys: following are the details:
> 
> 1. topic with 6 partitions
> 2. 8 unique keys used to produce events onto the topic
> 
> Used 'key' based partitioning while producing events onto the above
>> topic
> Observation: only 3 partitions were utilized for all the events
 pertaining
> to those 8 unique keys.
> 
> Any idea how can the load be even across partitions while using key
>> based
> partitioning strategy? Any help would be greatly appreciated.
> 
> Note: we cannot use round robin since key level ordering matters for us
> 
 
>> 
>> 



[ANNOUNCE] Apache Kafka 2.7.2

2021-11-22 Thread Mickael Maison
The Apache Kafka community is pleased to announce the release for
Apache Kafka 2.7.2

This is a bug fix release and it includes fixes and improvements from
26 JIRAs, including a fix for CVE-2021-38153.

All of the changes in this release can be found in the release notes:
https://downloads.apache.org/kafka/2.7.2/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12 and 2.13) from:
https://kafka.apache.org/downloads#2.7.2

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 20 contributors to this release!

A. Sophie Blee-Goldman, Alexander Iskuskov, Chris Egerton, David
Arthur, Davor Poldrugo, Dejan Stojadinović, Ismael Juma, Jason
Gustafson, Justine Olshan, Konstantine Karantasis, Lee Dongjin, Luke
Chen, Matthias J. Sax, Michael Carter, Mickael Maison, Rajini Sivaram,
Randall Hauch, Shay Elkin, Stanislav Vodetskyi, Tom Bentley

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,
Mickael Maison


[ANNOUNCE] Apache Kafka 2.6.3

2021-11-22 Thread Mickael Maison
The Apache Kafka community is pleased to announce the release for
Apache Kafka 2.6.3

This is a bug fix release and it includes fixes and improvements from
11 JIRAs, including a fix for CVE-2021-38153.

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/2.6.3/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12 and 2.13) from:
https://kafka.apache.org/downloads#2.6.3

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 12 contributors to this release!

A. Sophie Blee-Goldman, ableegoldman, Alexander Iskuskov, Chris
Egerton, Ismael Juma, Jason Gustafson, Konstantine Karantasis, Lee
Dongjin, Matthias J. Sax, Mickael Maison, Rajini Sivaram, Randall
Hauch

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,
Mickael Maison


Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-22 Thread Pushkar Deole
Dave,

i am not sure i get your point... it is not about lesser partitions, the
issue is about the duplicate hash caused by default partitioner for 2
different string, which might be landing the 2 different keys into same
partition

On Sun, Nov 21, 2021 at 9:33 PM Dave Klein  wrote:

> Another possibility, if you can pause processing, is to create a new topic
> with the higher number of partitions, then consume from the beginning of
> the old topic and produce to the new one. Then continue processing as
> normal and all events will be in the correct partitions.
>
> Regards,
> Dave
>
> > On Nov 21, 2021, at 7:38 AM, Pushkar Deole  wrote:
> >
> > Thanks Luke, I am sure this problem would have been faced by many others
> > before so would like to know if there are any existing custom algorithms
> > that can be reused,
> >
> > Note that we also have requirement to maintain key level ordering,  so
> the
> > custom partitioner should support that as well
> >
> >> On Sun, Nov 21, 2021, 18:29 Luke Chen  wrote:
> >>
> >> Hello Pushkar,
> >> Default distribution algorithm is by "hash(key) % partition_count", so
> >> there's possibility to have the uneven distribution you saw.
> >>
> >> Yes, there's a way to solve your problem: custom partitioner:
> >>
> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
> >>
> >> You can check the partitioner javadoc here
> >> <
> >>
> https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/Partitioner.html
> >>>
> >> for reference. You can see some examples from built-in partitioners, ex:
> >>
> >>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
> >> Basically, you want to focus on the "partition" method, to define your
> own
> >> algorithm to distribute the keys based on the events, ex: key-1 ->
> >> partition-1, key-2 -> partition-2... etc.
> >>
> >> Thank you.
> >> Luke
> >>
> >>
> >> On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole 
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> We are experiencing some uneven distribution of events across topic
> >>> partitions for a small set of unique keys: following are the details:
> >>>
> >>> 1. topic with 6 partitions
> >>> 2. 8 unique keys used to produce events onto the topic
> >>>
> >>> Used 'key' based partitioning while producing events onto the above
> topic
> >>> Observation: only 3 partitions were utilized for all the events
> >> pertaining
> >>> to those 8 unique keys.
> >>>
> >>> Any idea how can the load be even across partitions while using key
> based
> >>> partitioning strategy? Any help would be greatly appreciated.
> >>>
> >>> Note: we cannot use round robin since key level ordering matters for us
> >>>
> >>
>
>