Re: Changing Partitions of kafka

2021-11-21 Thread Luke Chen
Hi Rajat,
> So to route old keys , I will have to route old keys first before I start
accepting the new data. Right?
The old keys will automatically be routed to different partitions based on
the partitioner used (in partitioner.class config).

> A separate partitioner code has to be executed post increase in
partitions which will read all records in kafka topic and run partition
algorithm and then push it to new partition number simply. , then we can
start accepting new messages. Is my understanding correct?
Basically that is correct, except that the partitioner won't read records
in kafka topics, instead, it reads records in producer. The partitioner
exists in producer. When sending records to brokers, the partitioner in
producer will decide which topic partition to send to, and find the
partition leader broker for that partition.

So, take an example.
Before increasing the partition count, keys are "key-0", key-1", ...
"key-99"
Suppose using default partitioner, it maps to 1 partition each key, ex:
key-0 -> p-0 (partition 0)
key-1 -> p-1
...
key-99 -> p-99
(this is assumption, in reality, there might be some keys send to the same
partition)

So, after a while, you need to increase the partition count for some
reasons. Again, for the records already sent to brokers, you can't
change/update them. So, to keep the same key to the same partition after
partition count increasing, you can write your own partitioner, to map
key-0 -> p-0, key-1-> p-1... key-99 -> p-99. And for other keys not existed
before, you use your own algorithm to distribute them, ex: [hash(key) %
10(new added partitions)]

That said, if all record keys are identical before and after the partition
count increasing, then, there might not be any good way to keep the same
key to the same partition, and also leverage all the partitions (including
new added partitions).

Thank you.
Luke

On Mon, Nov 22, 2021 at 12:24 AM rajat kumar 
wrote:

> Hi Luke
>
> Thanks for responding. So to route old keys , I will have to route old keys
> first before I start accepting the new data. Right?
> A separate partitioner code has to be executed post increase in partitions
> which will read all records in kafka topic and run partition algorithm and
> then push it to new partition number simply. , then we can start accepting
> new messages. Is my understanding correct?
>
> On Sun, Nov 21, 2021 at 6:19 PM Luke Chen  wrote:
>
> > Hello Rajat,
> >
> > I'm not sure what you mean to "reshuffle messages", because once the
> > messages are stored in brokers, they can't be modified anymore.
> > But if you want to make the previous added messages route to the same
> > partitions after partition increasing, you can write custom partitioner:
> >
> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
> >
> > So, for example, you added 10 partitions for some new keys (ex: key-101 ~
> > key-110), you can write the partitioner to route the old keys to
> [hash(key)
> > % 100(old partition count)], and new keys route to [hash(key) % 10(new
> > added partitions)].
> >
> > Thank you.
> > Luke
> >
> > On Sun, Nov 21, 2021 at 7:46 PM rajat kumar 
> > wrote:
> >
> > > Hello Users,
> > >
> > > I am pretty new to Kafka, we will have key based messages coming up in
> > > kafka.
> > > We will have a 5 node cluster and I am going ahead with a 100 partition
> > for
> > > the topic for now.
> > > Let's say if there is a need to increase the number of partitions. How
> > do I
> > > reshuffle messages , since previously added messages would end up in
> the
> > > wrong partition as per hash partition algo?
> > >
> > > Thanks
> > > Rajat
> > >
> >
>


Re: Kafka and Java 17

2021-11-21 Thread Josep Prat
Hi Mark,
Kafka 3.1.0, which already reached feature freeze and will be released
soon, is the version that introduces support for Java 17.

I hope this helps.

Best,
———
Josep Prat

Aiven Deutschland GmbH

Immanuelkirchstraße 26, 10405 Berlin

Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen

m: +491715557497

w: aiven.io

e: josep.p...@aiven.io

On Sun, Nov 21, 2021, 16:32 Mark Anderson  wrote:

> What is the status of support for Java 17 in Kafka for both brokers and
> clients?
>
> The docs for Kafka 3.0.0 state that Java 8 and Java 11 are supported.
>
> Thanks,
> Mark
>


subscribbe

2021-11-21 Thread rajat kumar



Re: Changing Partitions of kafka

2021-11-21 Thread rajat kumar
Hi Luke

Thanks for responding. So to route old keys , I will have to route old keys
first before I start accepting the new data. Right?
A separate partitioner code has to be executed post increase in partitions
which will read all records in kafka topic and run partition algorithm and
then push it to new partition number simply. , then we can start accepting
new messages. Is my understanding correct?

On Sun, Nov 21, 2021 at 6:19 PM Luke Chen  wrote:

> Hello Rajat,
>
> I'm not sure what you mean to "reshuffle messages", because once the
> messages are stored in brokers, they can't be modified anymore.
> But if you want to make the previous added messages route to the same
> partitions after partition increasing, you can write custom partitioner:
> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
>
> So, for example, you added 10 partitions for some new keys (ex: key-101 ~
> key-110), you can write the partitioner to route the old keys to [hash(key)
> % 100(old partition count)], and new keys route to [hash(key) % 10(new
> added partitions)].
>
> Thank you.
> Luke
>
> On Sun, Nov 21, 2021 at 7:46 PM rajat kumar 
> wrote:
>
> > Hello Users,
> >
> > I am pretty new to Kafka, we will have key based messages coming up in
> > kafka.
> > We will have a 5 node cluster and I am going ahead with a 100 partition
> for
> > the topic for now.
> > Let's say if there is a need to increase the number of partitions. How
> do I
> > reshuffle messages , since previously added messages would end up in the
> > wrong partition as per hash partition algo?
> >
> > Thanks
> > Rajat
> >
>


Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-21 Thread Dave Klein
Another possibility, if you can pause processing, is to create a new topic with 
the higher number of partitions, then consume from the beginning of the old 
topic and produce to the new one. Then continue processing as normal and all 
events will be in the correct partitions. 

Regards,
Dave

> On Nov 21, 2021, at 7:38 AM, Pushkar Deole  wrote:
> 
> Thanks Luke, I am sure this problem would have been faced by many others
> before so would like to know if there are any existing custom algorithms
> that can be reused,
> 
> Note that we also have requirement to maintain key level ordering,  so the
> custom partitioner should support that as well
> 
>> On Sun, Nov 21, 2021, 18:29 Luke Chen  wrote:
>> 
>> Hello Pushkar,
>> Default distribution algorithm is by "hash(key) % partition_count", so
>> there's possibility to have the uneven distribution you saw.
>> 
>> Yes, there's a way to solve your problem: custom partitioner:
>> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
>> 
>> You can check the partitioner javadoc here
>> <
>> https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/Partitioner.html
>>> 
>> for reference. You can see some examples from built-in partitioners, ex:
>> 
>> clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
>> Basically, you want to focus on the "partition" method, to define your own
>> algorithm to distribute the keys based on the events, ex: key-1 ->
>> partition-1, key-2 -> partition-2... etc.
>> 
>> Thank you.
>> Luke
>> 
>> 
>> On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole 
>> wrote:
>> 
>>> Hi All,
>>> 
>>> We are experiencing some uneven distribution of events across topic
>>> partitions for a small set of unique keys: following are the details:
>>> 
>>> 1. topic with 6 partitions
>>> 2. 8 unique keys used to produce events onto the topic
>>> 
>>> Used 'key' based partitioning while producing events onto the above topic
>>> Observation: only 3 partitions were utilized for all the events
>> pertaining
>>> to those 8 unique keys.
>>> 
>>> Any idea how can the load be even across partitions while using key based
>>> partitioning strategy? Any help would be greatly appreciated.
>>> 
>>> Note: we cannot use round robin since key level ordering matters for us
>>> 
>> 



Kafka and Java 17

2021-11-21 Thread Mark Anderson
What is the status of support for Java 17 in Kafka for both brokers and
clients?

The docs for Kafka 3.0.0 state that Java 8 and Java 11 are supported.

Thanks,
Mark


SSl handshake failed

2021-11-21 Thread Giuseppe Ricci Sysman
Hi,



I configured my Kafka installation (on a remote Ubuntu server with only 1
broker) with SSL (I generate all certificates needed), but when I try to
verify if it's ok with the command:



openssl s_client -debug -connect localhost:9093 -tls1



I have this reply:



CONNECTED(0003)

write to 0x55e48b840750 [0x55e48b8512d0] (7 bytes => 7 (0x7))

 - 15 03 01 00 02 02 50  ..P

139631163029312:error:141E70BF:SSL routines:tls_construct_client_hello:no
protocols available:../ssl/statem/statem_clnt.c:1112:

---

no peer certificate available

---

No client certificate CA names sent

---

SSL handshake has read 0 bytes and written 7 bytes

Verification: OK

---

New, (NONE), Cipher is (NONE)

Secure Renegotiation IS NOT supported

Compression: NONE

Expansion: NONE

No ALPN negotiated

Early data was not sent

Verify return code: 0 (ok)

---

read from 0x55e48b840750 [0x55e48b834fe0] (8192 bytes => 0 (0x0))



and I suppose it is not ok.

If I try to create a topic with:



./bin/kafka-topics.sh --create --bootstrap-server localhost:9094
--command-config /home/kafka/kafka2_13_3/config/ssl-user-config.properties
--replication-factor 1 --partitions 1 --topic demo-topic



I receive the error:



[2021-11-21 13:49:55,854] ERROR [AdminClient clientId=adminclient-1]
Connection to node -1 (localhost/127.0.0.1:9094) failed authentication due
to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)

[2021-11-21 13:49:55,855] WARN [AdminClient clientId=adminclient-1] Metadata
update failed due to authentication error
(org.apache.kafka.clients.admin.internals.AdminMetadataManager)

org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake
failed

Caused by: javax.net.ssl.SSLException: Unrecognized SSL message, plaintext
connection?

at
java.base/sun.security.ssl.SSLEngineInputRecord.bytesInCompletePacket(SSLEng
ineInputRecord.java:146)

at
java.base/sun.security.ssl.SSLEngineInputRecord.bytesInCompletePacket(SSLEng
ineInputRecord.java:64)

at
java.base/sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:557)

at
java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:454)

at
java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:433)

at java.base/javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:637)

at
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTranspo
rtLayer.java:509)

at
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLa
yer.java:368)

at
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLaye
r.java:291)

at
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)

at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543
)

at org.apache.kafka.common.network.Selector.poll(Selector.java:481)

at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)

at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processR
equests(KafkaAdminClient.java:1389)

at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(Kafk
aAdminClient.java:1320)

at java.base/java.lang.Thread.run(Thread.java:829)

Error while executing topic command : SSL handshake failed



Something went wrong?



This my server.properties (SSL configs):



listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localho
st:9094



ssl.keystore.location=/home/kafka/ssl/kafka.server.keystore.jks

ssl.keystore.password=secret

ssl.key.password=secret

ssl.truststore.location=/home/kafka/ssl/kafka.server.truststore.jks

ssl.truststore.password= secret



advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SS
L://localhost:9094



zookeeper.connect=localhost:2181



#security.inter.broker.protocol=SSL

#ssl.client.auth=required

#sasl.enabled.mechanisms=PLAIN



### SECURITY using SCRAM-SHA-512 and SSL ###



ssl.endpoint.identification.algorithm=https://localhost

ssl.client.auth=none

sasl.enabled.mechanisms=PLAIN





client.properties:

listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localho
st:9094

#listeners=PLAINTEXT://:9092,SSL://:9093,SASL_SSL://:9094





ssl.keystore.location=/home/kafka/ssl/kafka.server.keystore.jks

ssl.keystore.password=secret

ssl.key.password=giuseppe

ssl.truststore.location=/home/kafka/ssl/kafka.server.truststore.jks

ssl.truststore.password=secret



advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SS
L://localhost:9094

#advertised.listeners=PLAINTEXT://:9092,SSL://:9093,SASL_SSL://:9094



zookeeper.connect=localhost:2181



#security.inter.broker.protocol=SSL

#ssl.client.auth=required

#sasl.enabled.mechanisms=PLAIN



### SECURITY using SCRAM-SHA-512 and SSL ###





#security.inter.broker.protocol=SASL_SSL

ssl.endp

Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-21 Thread Pushkar Deole
Thanks Luke, I am sure this problem would have been faced by many others
before so would like to know if there are any existing custom algorithms
that can be reused,

Note that we also have requirement to maintain key level ordering,  so the
custom partitioner should support that as well

On Sun, Nov 21, 2021, 18:29 Luke Chen  wrote:

> Hello Pushkar,
> Default distribution algorithm is by "hash(key) % partition_count", so
> there's possibility to have the uneven distribution you saw.
>
> Yes, there's a way to solve your problem: custom partitioner:
> https://kafka.apache.org/documentation/#producerconfigs_partitioner.class
>
> You can check the partitioner javadoc here
> <
> https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/producer/Partitioner.html
> >
> for reference. You can see some examples from built-in partitioners, ex:
>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
> Basically, you want to focus on the "partition" method, to define your own
> algorithm to distribute the keys based on the events, ex: key-1 ->
> partition-1, key-2 -> partition-2... etc.
>
> Thank you.
> Luke
>
>
> On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole 
> wrote:
>
> > Hi All,
> >
> > We are experiencing some uneven distribution of events across topic
> > partitions for a small set of unique keys: following are the details:
> >
> > 1. topic with 6 partitions
> > 2. 8 unique keys used to produce events onto the topic
> >
> > Used 'key' based partitioning while producing events onto the above topic
> > Observation: only 3 partitions were utilized for all the events
> pertaining
> > to those 8 unique keys.
> >
> > Any idea how can the load be even across partitions while using key based
> > partitioning strategy? Any help would be greatly appreciated.
> >
> > Note: we cannot use round robin since key level ordering matters for us
> >
>


Re: uneven distribution of events across kafka topic partitions for small number of unique keys

2021-11-21 Thread Luke Chen
Hello Pushkar,
Default distribution algorithm is by "hash(key) % partition_count", so
there's possibility to have the uneven distribution you saw.

Yes, there's a way to solve your problem: custom partitioner:
https://kafka.apache.org/documentation/#producerconfigs_partitioner.class

You can check the partitioner javadoc here

for reference. You can see some examples from built-in partitioners, ex:
clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java.
Basically, you want to focus on the "partition" method, to define your own
algorithm to distribute the keys based on the events, ex: key-1 ->
partition-1, key-2 -> partition-2... etc.

Thank you.
Luke


On Sat, Nov 20, 2021 at 2:55 PM Pushkar Deole  wrote:

> Hi All,
>
> We are experiencing some uneven distribution of events across topic
> partitions for a small set of unique keys: following are the details:
>
> 1. topic with 6 partitions
> 2. 8 unique keys used to produce events onto the topic
>
> Used 'key' based partitioning while producing events onto the above topic
> Observation: only 3 partitions were utilized for all the events pertaining
> to those 8 unique keys.
>
> Any idea how can the load be even across partitions while using key based
> partitioning strategy? Any help would be greatly appreciated.
>
> Note: we cannot use round robin since key level ordering matters for us
>


Re: Changing Partitions of kafka

2021-11-21 Thread Luke Chen
Hello Rajat,

I'm not sure what you mean to "reshuffle messages", because once the
messages are stored in brokers, they can't be modified anymore.
But if you want to make the previous added messages route to the same
partitions after partition increasing, you can write custom partitioner:
https://kafka.apache.org/documentation/#producerconfigs_partitioner.class

So, for example, you added 10 partitions for some new keys (ex: key-101 ~
key-110), you can write the partitioner to route the old keys to [hash(key)
% 100(old partition count)], and new keys route to [hash(key) % 10(new
added partitions)].

Thank you.
Luke

On Sun, Nov 21, 2021 at 7:46 PM rajat kumar 
wrote:

> Hello Users,
>
> I am pretty new to Kafka, we will have key based messages coming up in
> kafka.
> We will have a 5 node cluster and I am going ahead with a 100 partition for
> the topic for now.
> Let's say if there is a need to increase the number of partitions. How do I
> reshuffle messages , since previously added messages would end up in the
> wrong partition as per hash partition algo?
>
> Thanks
> Rajat
>


Changing Partitions of kafka

2021-11-21 Thread rajat kumar
Hello Users,

I am pretty new to Kafka, we will have key based messages coming up in
kafka.
We will have a 5 node cluster and I am going ahead with a 100 partition for
the topic for now.
Let's say if there is a need to increase the number of partitions. How do I
reshuffle messages , since previously added messages would end up in the
wrong partition as per hash partition algo?

Thanks
Rajat