Re: Re-key by multiple properties without composite key

2024-01-31 Thread Matthias J. Sax

Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic") 
-- no need to so `builder.stream().toTable()`).


And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
  folderTable
  .join(personTable, (folderId, folder) -> folder.customerId, ...)
  .groupBy((...) -> (personId, ...))
  .aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that 
tells the join to use `customerId` from the `folderTable` to lookup the 
person from personTable.


Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
   Long id;
   String firstname;
   String lastname;
   // some content
}

class Folder {
   Long id;
   String folderNumber;
   // some other content
   Long customerId; // FK, points to Person.id
   Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys of Folder point to Person entities, yet with
different semantics. They're not composite keys but act independently.

Now assume I want to build an aggregate Person object containing
Folder.folderNumber of all folders associated with a Person entity,
regardless whether it acts as a customer or billing address. My
(naive) idea was to build re-keyed KTables by Folder.customerId and
Folder.billingAddressId and then joining / aggregating them with the
Person KTable in order to build something like this:

class AggregatedPerson {
   Long id;
   List folderNumbers; // or even List
   // ...
}

(The latter supposed to be written to an output topic in order to
serve as input for Solr or ElasticSearch.)

Does this even make sense?



If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.


So re-key means repartition? ATM the partition size of all input
topics is 1 as per Kafka UI, as I've specified no extra configuration
for them.

Best wishes,
Karsten

Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax :



Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.

You could read the topic as KStream though, and provide a custom
`StreamPartitioner` for a `repartition()` operation. However, this is
also "dangerous" because for a KStream it's also assumed that it's
partitioned by it's key, and you might break downstream DSL operators
with such a violation of the "contract".

Looking into your solution:


.toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk1(), value),
 Grouped.with(...))


This will set fk1 as key, what seems not to align with you previous
comment about the key should stay the ID? (Same for f2k).

Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's
unclear what you try to actually do to begin with? It sound like it's
overall a self-join of the input topic on fk1 and fk2 ?


-Matthias

On 1/28/24 2:24 AM, Karsten Stöckmann wrote:

Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
  private Long id;
  private String someContent;
  private Long fk1;
  private Long fk2;
  // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream in = streamsBuilder.stream(topic, Consumed.with(...));

KTable rekeyedByFk1 = in
  .toTable()
  .groupBy(
  (key, value) -> KeyValue.pair(value.fk1(), value),
  Grouped.with(...))
  .aggregate(
  

question: mirror maker 2

2024-01-31 Thread M. Lim
Hello all,

I am setting up mm2 to replicate messages, consumer groups, and consumer
offset from a->b.  I believe I am replicating those 3 items from a->b.  my
mm2 prop file is as followed:

```
# specify any number of cluster aliases
clusters = a,b
b.group.id=mm2-request

# replication settings
tasks.max = 24
replication.policy.class =
org.apache.kafka.connect.mirror.IdentityReplicationPolicy
a.max.poll.records = 2
#a.receive.buffer.bytes = 33554432
#a.send.buffer.bytes = 33554432
#a.max.partition.fetch.bytes = 33554432
#a.message.max.bytes = 37755000
a.compression.type = gzip
#a.max.request.size = 26214400
#a.buffer.memory = 524288000
a.batch.size = 524288

b.max.poll.records = 2
#b.receive.buffer.bytes = 33554432
#b.send.buffer.bytes = 33554432
#b.max.partition.fetch.bytes = 33554432
#b.message.max.bytes = 37755000
b.compression.type = gzip
#b.max.request.size = 26214400
#b.buffer.memory = 524288000
b.batch.size = 524288

a.bootstrap.servers = aaa.aws.confluent.cloud:9092
a.sasl.jaas.config =
org.apache.kafka.common.security.plain.PlainLoginModule required username=
"aaa" password="aaa";
a.sasl.mechanism = PLAIN
a.security.protocol = SASL_SSL
a.ssl.endpoint.identification.algroithm = https

b.bootstrap.servers = bbb.aws.confluent.cloud:9092
b.sasl.jaas.config =
org.apache.kafka.common.security.plain.PlainLoginModule required username=
"bbb" password="bbb";
b.sasl.mechanism = PLAIN
b.security.protocol = SASL_SSL
b.ssl.endpoint.identification.algroithm = https

# enable and configure individual replication flows
a->b.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
# a->b.topics = .*
a->b.topics = order-cmd-request-01

# topic exclusion
topics.blacklist = .*[\\-\\.]internal, .*\\.replica

# group to replicate
groups = .*

# group exclusion
# groups.blacklist = console-consumer-.*

sync.topic.acls.enabled = false
sync.topic.configs.enabled = true
refresh.topics.enabled = false
refresh.topics.interval.seconds = 600

checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3

refresh.groups.enabled = true
refresh.groups.interval.seconds = 600

a->b.sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 5
emit.checkpoints.interval.seconds = 5
emit.heartbeats.interval.seconds = 5
offset.translate.method = simple

# sync acl
# sync.topic.acls.enabled = false

# enable heartbeat
emit.heartbeats.enabled = true
emit.checkpoints.enabled = true

# Setting replication factor of newly created remote topics
replication.factor = 3

# Internal Topic Settings
#
# The replication factor for mm2 internal topics "heartbeats",
"B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3

# The replication factor for connect internal topics
"mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3
```
I'm able to see the messages and such on the 'b' cluster.  I then proceed
to terminate the api that was pointing to the 'a' cluster.  Repoint it to
the 'b' cluster.  Redeploy and upon starting up, the api is throwing an
error:

```
{"@timestamp":"2024-01-25T20:49:32.758Z", "log.level": "WARN", "message":"Error
registering AppInfo mbean", "ecs.version": "1.2.0","process.thread.name":
"main","log.logger":"org.apache.kafka.common.utils.AppInfoParser","
service.name":"prod-usf-order-integration-debug-api","error.type":
"javax.management.InstanceAlreadyExistsException","error.message":
"kafka.consumer:type=app-info,id=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0"
,"error.stack_trace":"javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0\n\tat
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)\n\tat