Re: Re-key by multiple properties without composite key

2024-01-30 Thread Karsten Stöckmann
Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
  Long id;
  String firstname;
  String lastname;
  // some content
}

class Folder {
  Long id;
  String folderNumber;
  // some other content
  Long customerId; // FK, points to Person.id
  Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys of Folder point to Person entities, yet with
different semantics. They're not composite keys but act independently.

Now assume I want to build an aggregate Person object containing
Folder.folderNumber of all folders associated with a Person entity,
regardless whether it acts as a customer or billing address. My
(naive) idea was to build re-keyed KTables by Folder.customerId and
Folder.billingAddressId and then joining / aggregating them with the
Person KTable in order to build something like this:

class AggregatedPerson {
  Long id;
  List folderNumbers; // or even List
  // ...
}

(The latter supposed to be written to an output topic in order to
serve as input for Solr or ElasticSearch.)

Does this even make sense?


> If you read the topic a KTable, you cannot repartition because it
> violates the contract. A KTable must be partitioned by it's primary key,
> ie, the ID field, and thus the DSL does not offer you a repartition option.

So re-key means repartition? ATM the partition size of all input
topics is 1 as per Kafka UI, as I've specified no extra configuration
for them.

Best wishes,
Karsten

Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax :
>
> >> Both fk1 and fk2 point to the PK of another entity (not shown for
> >> brevity, of no relevance to the question).
>
> It this two independent FK, or one two-column FK?
>
>
> > Ingesting the topic into a Kafka Streams application, how can I re-key
> > the resulting KTable by both fk1 and fk2?
>
> If you read the topic a KTable, you cannot repartition because it
> violates the contract. A KTable must be partitioned by it's primary key,
> ie, the ID field, and thus the DSL does not offer you a repartition option.
>
> You could read the topic as KStream though, and provide a custom
> `StreamPartitioner` for a `repartition()` operation. However, this is
> also "dangerous" because for a KStream it's also assumed that it's
> partitioned by it's key, and you might break downstream DSL operators
> with such a violation of the "contract".
>
> Looking into your solution:
>
> > .toTable()
> > .groupBy(
> > (key, value) -> KeyValue.pair(value.fk1(), value),
> > Grouped.with(...))
>
> This will set fk1 as key, what seems not to align with you previous
> comment about the key should stay the ID? (Same for f2k).
>
> Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's
> unclear what you try to actually do to begin with? It sound like it's
> overall a self-join of the input topic on fk1 and fk2 ?
>
>
> -Matthias
>
> On 1/28/24 2:24 AM, Karsten Stöckmann wrote:
> > Hi all,
> >
> > just stumbled upon another Kafka Streams issue that keeps me busy these 
> > days.
> >
> > Assume a (simplified) class A like this:
> >
> > class A {
> >  private Long id;
> >  private String someContent;
> >  private Long fk1;
> >  private Long fk2;
> >  // Getters and setters accordingly
> > }
> >
> > Both fk1 and fk2 point to the PK of another entity (not shown for
> > brevity, of no relevance to the question).
> >
> > Now assume a Kafka topic built from instances of class A, keyed by its
> > id (see above).
> >
> > Ingesting the topic into a Kafka Streams application, how can I re-key
> > the resulting KTable by both fk1 and fk2? Note that the
> > resulting key should not be changed or turned into some kind of
> > composite key as it is used in later join operations.
> >
> > My (naive) solution involves creating two KTables from the input
> > stream, re-keying them by fk1 and fk2 accordingly and then outer
> > joining both resulting (re-keyed) KTables.
> >
> > KStream in = streamsBuilder.stream(topic, Consumed.with(...));
> >
> > KTable rekeyedByFk1 = in
> >  .toTable()
> >  .groupBy(
> >  (key, value) -> KeyValue.pair(value.fk1(), value),
> >  Grouped.with(...))
> >  .aggregate(
> >  Aggregate::new,
> >  (key, value, aggregate) -> aggregate.add(value),
> >  (key, value, aggregate) -> aggregate.remove(value),
> >  Materialized.with(...));
> >
> > KTable rekeyedByFk2 = in
> >  .toTable()
> >  .groupBy(
> >  (key, value) -> KeyValue.pair(value.fk2(), value),
> >  Grouped.with(...))
> >  .aggregate(
> >  ... same as above
> >  );
> >
> > KTable joined = rekeyedByFk1
> >  .outerJoin(
> >  rekeyedByFk2,
> >  )
> >.groupBy(KeyValue::pair, Grouped.with(...))
> >  .aggregate(...);
> >
> >  would integrate the (already 

Re: What does kafka streams groupBy does internally?

2024-01-30 Thread Matthias J. Sax

Did reply on SO.

-Matthias

On 1/24/24 2:18 AM, warrior2...@gmail.com wrote:
Let's say there's a topic in which chunks of different files are all 
mixed up represented by a tuple |(FileId, Chunk)|.


Chunks of a same file also can be a little out of order.

The task is to aggregate all files and store them into some store.

The number of files is unbound.

In pseudo stream DSL that might look like

|topic('chunks') .groupByKey((fileId, chunk) -> fileId) .sortBy((fileId, 
chunk) -> chunk.offset) .aggregate((fileId, chunk) -> 
store.append(fileId, chunk)); |


I want to understand whether kafka streams can solve this efficiently. 
Since the number of files is unbound how would kafka manage intermediate 
topics for groupBy operation? How many partitions will it use etc? Can't 
find this details in the docs. Also let's say chunk has a flag that 
indicates EOF. How to indicate that specific group will no longer have 
any new data?



That’s a copy of my stack overflow question.
apple-touch-i...@2.png
What does kafka streams groupBy does internally? 

stackoverflow.com 






—
Michael


Re: Re-key by multiple properties without composite key

2024-01-30 Thread Matthias J. Sax

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it 
violates the contract. A KTable must be partitioned by it's primary key, 
ie, the ID field, and thus the DSL does not offer you a repartition option.


You could read the topic as KStream though, and provide a custom 
`StreamPartitioner` for a `repartition()` operation. However, this is 
also "dangerous" because for a KStream it's also assumed that it's 
partitioned by it's key, and you might break downstream DSL operators 
with such a violation of the "contract".


Looking into your solution:


.toTable()
.groupBy(
(key, value) -> KeyValue.pair(value.fk1(), value),
Grouped.with(...))


This will set fk1 as key, what seems not to align with you previous 
comment about the key should stay the ID? (Same for f2k).


Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's 
unclear what you try to actually do to begin with? It sound like it's 
overall a self-join of the input topic on fk1 and fk2 ?



-Matthias

On 1/28/24 2:24 AM, Karsten Stöckmann wrote:

Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
 private Long id;
 private String someContent;
 private Long fk1;
 private Long fk2;
 // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream in = streamsBuilder.stream(topic, Consumed.with(...));

KTable rekeyedByFk1 = in
 .toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk1(), value),
 Grouped.with(...))
 .aggregate(
 Aggregate::new,
 (key, value, aggregate) -> aggregate.add(value),
 (key, value, aggregate) -> aggregate.remove(value),
 Materialized.with(...));

KTable rekeyedByFk2 = in
 .toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk2(), value),
 Grouped.with(...))
 .aggregate(
 ... same as above
 );

KTable joined = rekeyedByFk1
 .outerJoin(
 rekeyedByFk2,
 )
   .groupBy(KeyValue::pair, Grouped.with(...))
 .aggregate(...);

 would integrate the (already pre-joined) Aggregates as
to avoid duplicates.

Does this seem like a viable solution, or are there better / simpler /
more efficient implementations?

Best wishes,
Karsten


Re: [VOTE] 3.7.0 RC2

2024-01-30 Thread Omnia Ibrahim
Hi Stan and Gaurav, 
Just to clarify some points mentioned here before 
 KAFKA-14616: I raised a year ago so it's not related to JBOD work. It is 
rather a blocker bug for KRAFT in general. The PR from Colin should fix this. 
Am not sure if it is a blocker for 3.7 per-say as it was a major bug since 3.3 
and got missed from all other releases.
 
Regarding the JBOD's work: 
KAFKA-16082:  Is not a blocker for 3.7 instead it's nice fix. The pr 
https://github.com/apache/kafka/pull/15136 is quite a small one and was 
approved by Proven and I but it is waiting for a committer's approval.
KAFKA-16162: This is a blocker for 3.7.  Same it’s a small pr 
https://github.com/apache/kafka/pull/15270 and it is approved Proven and I and 
the PR is waiting for committer's approval. 
KAFKA-16157: This is a blocker for 3.7. There is one small suggestion for the 
pr https://github.com/apache/kafka/pull/15263 but I don't think any of the 
current feedback is blocking the pr from getting approved. Assuming we get a 
committer's approval on it. 
KAFKA-16195:  Same it's a blocker but it has approval from Proven and I and we 
are waiting for committer's approval on the pr 
https://github.com/apache/kafka/pull/15262. 

If we can’t get a committer approval for KAFKA-16162, KAFKA-16157 and 
KAFKA-16195  in time for 3.7 then we can mark JBOD as early release assuming we 
merge at least KAFKA-16195.

Regards, 
Omnia

> On 26 Jan 2024, at 15:39, ka...@gnarula.com wrote:
> 
> Apologies, I duplicated KAFKA-16157 twice in my previous message. I intended 
> to mention KAFKA-16195
> with the PR at https://github.com/apache/kafka/pull/15262 as the second JIRA.
> 
> Thanks,
> Gaurav
> 
>> On 26 Jan 2024, at 15:34, ka...@gnarula.com wrote:
>> 
>> Hi Stan,
>> 
>> I wanted to share some updates about the bugs you shared earlier.
>> 
>> - KAFKA-14616: I've reviewed and tested the PR from Colin and have observed
>> the fix works as intended.
>> - KAFKA-16162: I reviewed Proven's PR and found some gaps in the proposed 
>> fix. I've
>> therefore raised https://github.com/apache/kafka/pull/15270 following a 
>> discussion with Luke in JIRA.
>> - KAFKA-16082: I don't think this is marked as a blocker anymore. I'm 
>> awaiting
>> feedback/reviews at https://github.com/apache/kafka/pull/15136
>> 
>> In addition to the above, there are 2 JIRAs I'd like to bring everyone's 
>> attention to:
>> 
>> - KAFKA-16157: This is similar to KAFKA-14616 and is marked as a blocker. 
>> I've raised
>> https://github.com/apache/kafka/pull/15263 and am awaiting reviews on it.
>> - KAFKA-16157: I raised this yesterday and have addressed feedback from 
>> Luke. This should
>> hopefully get merged soon.
>> 
>> Regards,
>> Gaurav
>> 
>> 
>>> On 24 Jan 2024, at 11:51, ka...@gnarula.com wrote:
>>> 
>>> Hi Stanislav,
>>> 
>>> Thanks for bringing these JIRAs/PRs up.
>>> 
>>> I'll be testing the open PRs for KAFKA-14616 and KAFKA-16162 this week and 
>>> I hope to have some feedback
>>> by Friday. I gather the latter JIRA is marked as a WIP by Proven and he's 
>>> away. I'll try to build on his work in the meantime.
>>> 
>>> As for KAFKA-16082, we haven't been able to deduce a data loss scenario. 
>>> There's a PR open
>>> by me for promoting an abandoned future replica with approvals from Omnia 
>>> and Proven,
>>> so I'd appreciate a committer reviewing it.
>>> 
>>> Regards,
>>> Gaurav
>>> 
>>> On 23 Jan 2024, at 20:17, Stanislav Kozlovski 
>>>  wrote:
 
 Hey all, I figured I'd give an update about what known blockers we have
 right now:
 
 - KAFKA-16101: KRaft migration rollback documentation is incorrect -
 https://github.com/apache/kafka/pull/15193; This need not block RC
 creation, but we need the docs updated so that people can test properly
 - KAFKA-14616: Topic recreation with offline broker causes permanent URPs -
 https://github.com/apache/kafka/pull/15230 ; I am of the understanding that
 this is blocking JBOD for 3.7
 - KAFKA-16162: New created topics are unavailable after upgrading to 3.7 -
 a strict blocker with an open PR https://github.com/apache/kafka/pull/15232
 - although I understand Proveen is out of office
 - KAFKA-16082: JBOD: Possible dataloss when moving leader partition - I am
 hearing mixed opinions on whether this is a blocker (
 https://github.com/apache/kafka/pull/15136)
 
 Given that there are 3 JBOD blocker bugs, and I am not confident they will
 all be merged this week - I am on the edge of voting to revert JBOD from
 this release, or mark it early access.
 
 By all accounts, it seems that if we keep with JBOD the release will have
 to spill into February, which is a month extra from the time-based release
 plan we had of start of January.
 
 Can I ask others for an opinion?
 
 Best,
 Stan
 
 On Thu, Jan 18, 2024 at 1:21 PM Luke Chen  wrote:
 
> Hi all,
> 
> I think I've found another blocker issue: 

v3.5.1 KRAFT mode: Cluster every day exact same time unavailable for ~30sec -> no controller available

2024-01-30 Thread Denny Fuchs

Hello,

we have Kafka v3.5.1 in KRAFT mode running, with two datacenters (via 
10Gb/s Darkfiber):


DC 1 : 3 nodes Controller / Broker
DC 2 : 2 nodes Controller / Broker
DC 2 : 1 node Broker

Exactly at the same time: 21:01:00 (CEST) the cluster is unstable and no 
producer / consumer can access the cluster


Every node has:

* Own node ID
* RACK ID

 grep -E  '(id|rack)' /etc/kafka/server.properties

broker.rack=0
node.id=1

broker.rack=0 -> DC1, broker.rack=1 -> DC2

We have the complete same setup also on our test system, but it runs 
without any issues. The only differences, are the missing darkfiber and 
different hostnames / certs. The rest is the same,because we use Puppet 
for CFG management.


The logs looks like this:

DC 1, Node 1:

Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,135] INFO [RaftManager id=1] Completed transition to 
Unattached(epoch=1494, voters=[1, 2, 3, 4, 5], electionTimeoutMs=1638) 
from FollowerState(fetchTimeoutMs=2000, epoch=1493, leaderId=5, 
voters=[1, 2, 3, 4, 5], 
highWatermark=Optional[LogOffsetMetadata(offset=20072183, me>
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,137] INFO [RaftManager id=1] Vote request 
VoteRequestData(clusterId='Rnpnd4EcRBeWo8vUrWlOIQ', 
topics=[TopicData(topicName='__cluster_metadata', 
partitions=[PartitionData(partitionIndex=0, candidateEpoch=1494, 
candidateId=2, lastOffsetEpoch=1493, lastOffset=20072146)])]) w>
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,137] INFO [QuorumController id=1] In the new epoch 1494, the 
leader is (none). (org.apache.kafka.controller.QuorumController)
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,257] INFO [RaftManager id=1] Completed transition to 
Unattached(epoch=1495, voters=[1, 2, 3, 4, 5], electionTimeoutMs=1511) 
from Unattached(epoch=1494, voters=[1, 2, 3, 4, 5], 
electionTimeoutMs=1638) (org.apache.kafka.raft.QuorumState)
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,258] INFO [RaftManager id=1] Vote request 
VoteRequestData(clusterId='Rnpnd4EcRBeWo8vUrWlOIQ', 
topics=[TopicData(topicName='__cluster_metadata', 
partitions=[PartitionData(partitionIndex=0, candidateEpoch=1495, 
candidateId=2, lastOffsetEpoch=1493, lastOffset=20072146)])]) w>
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,258] INFO [QuorumController id=1] In the new epoch 1495, the 
leader is (none). (org.apache.kafka.controller.QuorumController)
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,378] INFO [RaftManager id=1] Completed transition to 
Unattached(epoch=1496, voters=[1, 2, 3, 4, 5], electionTimeoutMs=1391) 
from Unattached(epoch=1495, voters=[1, 2, 3, 4, 5], 
electionTimeoutMs=1511) (org.apache.kafka.raft.QuorumState)
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,378] INFO [RaftManager id=1] Vote request 
VoteRequestData(clusterId='Rnpnd4EcRBeWo8vUrWlOIQ', 
topics=[TopicData(topicName='__cluster_metadata', 
partitions=[PartitionData(partitionIndex=0, candidateEpoch=1496, 
candidateId=2, lastOffsetEpoch=1493, lastOffset=20072146)])]) w>
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,378] INFO [QuorumController id=1] In the new epoch 1496, the 
leader is (none). (org.apache.kafka.controller.QuorumController)
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,902] INFO [RaftManager id=1] Completed transition to 
Unattached(epoch=1497, voters=[1, 2, 3, 4, 5], electionTimeoutMs=870) 
from Unattached(epoch=1496, voters=[1, 2, 3, 4, 5], 
electionTimeoutMs=1391) (org.apache.kafka.raft.QuorumState)
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,902] INFO [RaftManager id=1] Vote request 
VoteRequestData(clusterId='Rnpnd4EcRBeWo8vUrWlOIQ', 
topics=[TopicData(topicName='__cluster_metadata', 
partitions=[PartitionData(partitionIndex=0, candidateEpoch=1497, 
candidateId=2, lastOffsetEpoch=1493, lastOffset=20072146)])]) w>
Jan 28 21:01:05 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:05,902] INFO [QuorumController id=1] In the new epoch 1497, the 
leader is (none). (org.apache.kafka.controller.QuorumController)
Jan 28 21:01:06 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:06,198] INFO [BrokerToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 5 
(org.apache.kafka.clients.NetworkClient)
Jan 28 21:01:06 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:06,349] INFO [RaftManager id=1] Completed transition to 
Unattached(epoch=1498, voters=[1, 2, 3, 4, 5], electionTimeoutMs=422) 
from Unattached(epoch=1497, voters=[1, 2, 3, 4, 5], 
electionTimeoutMs=870) (org.apache.kafka.raft.QuorumState)
Jan 28 21:01:06 qh-a08-kafka-01 kafka[1936210]: [2024-01-28 
21:01:06,350] INFO [QuorumController id=1] In the new epoch 1498, the 
leader is (none). (org.apache.kafka.controller.QuorumController)
Jan 28 21:01:06 qh-a08-kafka-01 kafka[1936210]: [2024-01-28