Re: Stream sinks are not constructed when application starts up before Kafka broker

2022-11-23 Thread John Roesler
Hi Alexander,

I’m sorry to hear that. It certainly sounds like a hard one to debug. 

To clarify, do you mean that when you observe this problem, the sink node is 
not in the topology at all, or that it is in the topology, but does not 
function properly?

Also, are you using Spring to construct the topology, or are you calling the 
Kafka Streams library directly to build the topology?

If the problem is that the sink node is missing completely, it’s hard to 
imagine how the problem could be Streams. When you are building the topology in 
Streams, there is no connection to Kafka at all.

Then again, I’ve seen enough heisenbugs to know not to trust intuition too 
much. If you can try just using the Streams builder directly to create the 
topology, maybe you can see if you can still reproduce the issue?

I hope this helps!
-John

On Tue, Nov 22, 2022, at 14:07, Alexander Kau wrote:
> My team is building a set of services that use Kafka Connect and Debezium
> to forward data changes from our Postgres database to Kafka, and then use
> Kafka Streams (via Spring Cloud Stream) to process this data and output an
> aggregate of the source data.
>
> We have been trying to track down an issue where the stream processors are
> not correctly configured when the application starts up before Kafka is up.
> Specifically, all of the processor nodes are correctly created except for
> the KTABLE-SINK-0# node. The result of this is that the services
> consume messages but do not forward their changes to their output topics.
> Therefore, data processing stops while the consumer offset continues to be
> incremented, so we lose messages and have to roll back the offsets and
> reprocess a large amount of data.
>
> This happens in both our Kubernetes environment and our Chef-managed
> environment. In the Chef environment, simply restarting the server is
> enough to trigger this issue, since Kafka takes longer to start up than the
> application. In Kubernetes, I can reliably trigger the issue by removing
> the application's dependency on Kafka, stopping Kafka, restarting the
> application, and then starting Kafka.
>
> I have tested with Kafka 3.0.0 and 3.3.1, and the behavior does not change.
> We are using the latest Spring dependencies (Spring cloud 2021.0.5, Spring
> Cloud Stream 3.2.6).
>
> This may be a "heisenbug": a bug that only occurs when it is not being
> observed. I spent most of yesterday debugging the Kafka setup code during
> application startup with and without the Kafka broker running, and was
> unable to reproduce the bug while doing so, but was able to consistently
> reproduce it as soon as I stopped debugging the startup process. I suspect
> that this may mean that this is a race condition in the parts of the
> application that only run after connecting to the Kafka broker, but I'm not
> familiar enough with the Kafka source to go much further on this.
>
> Although I understand that this is a bit of an edge case (the Kafka broker
> should generally not go down), the results here involve missing/invalid
> data, and it is not possible to confirm whether the application is in this
> case except by either confirming that "this consumer consumed a message but
> didn't forward it to its output topic" or by hooking up a debugger and
> inspecting the ProcessorContext, so we can't reasonably implement a health
> check to verify whether the application is in the bad state and restart it.
> (Even if we could check for this state, there's no guarantee that it didn't
> lose some messages while it was in the bad state.)
>
> I've done a fair amount of searching for this sort of issue, but have been
> unable to find any other people who I can confirm to have the same issue. I
> am not certain whether this is an issue in Kafka itself or in Spring Cloud
> Stream.
>
> Any guidance or suggestions would be appreciated.


Kafka 4.x release date

2022-11-23 Thread Moravcik . Juraj
Good morning,

we are using for our product (Fare Collection System Ticketing) Apache Kafka 
software.

https://www.scheidt-bachmann.de/en/fare-collection-systems/home

One of task is update logging from version1 to version2.

https://issues.apache.org/jira/browse/KAFKA-9366

Based on this link information's - it will be implanted in Apache Kafka Release 
4.x

Would you be so kind and provide us with proposal release date for 4.x release?

Thanks and Best regards,

Juraj Moravcik - Software Developer

[Logo Email]

Scheidt & Bachmann Slovensko s.r.o.
[fcs]Fare collection systems - Integration Back Office
Dúbravská cesta 4 - 841 04 Bratislava - Slovak Republic
Phone: +421 41 5060-943
moravcik.ju...@scheidt-bachmann.sk
www.scheidt-bachmann.sk

Follow us:

[LinkedIn_32]  
[Twitter_32]    [Facebook_32] 



Court of Record: Obchodný register Okresného súdu Žilina, oddiel Sro, vložka č. 
3171/L
Chairman of supervisory board: Dr.-Ing. Norbert Miller
Managing Directors: Ing. Róbert Kulla (CEO), Ing. Ján Krúpa, Dr. Peter Lazar, 
Ing. Marína Schiffer

Important Notice: This E-Mail and any files attached are confidential and may 
contain privileged information. If you are not the intended recipient, do not 
forward or disclose this E-Mail, open any attachments, make any copies or save 
this E-Mail anywhere. Please delete this E-Mail from your system and notify the 
sender (as applicable also by phone +421 41 5060-111). Thank you very much.

Please consider the environment - do you really need to print this email?



Re: same keys appearing in state stores on different pods when using branches in kafka streams

2022-11-23 Thread John Roesler
Hi Pushkar,

Thanks for the question. I think that what’s happening is that, even though 
both branches use the same grouping logic, Streams can’t detect that they are 
the same. It just sees two group-bys and therefore introduces two repartitions, 
with a separate downstream task for each.

You might want to print out the topology description and visualize it with 
https://zz85.github.io/kafka-streams-viz/ . That will show you whether the 
stores wind up in the same task or not.

The visualization will also show you the names of the input topics for those 
two partitions, which you can use in conjunction with the metadata methods on 
your KafkaStreams instance to query for the location of the keys in both stores.

I suspect that with some tweaks you can re-write the topology to just have one 
downstream task, if that’s what you prefer.

By the way, I think you could propose to add an optimization to make the 
groupBy behave the way you expected. If that’s interesting to you, let us know 
and we can give you some pointers!

I hope this helps,
John

On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
> Hi All,
>
> I have a stream application that creates 2 branches.  Each branch includes
> a state store where the status field of the kafka message determines the
> branch, and therefore the state store used:
>
> Status OPEN = State store name totals
>
> Status CLOSED = State store name records
>
>
>
> I’m seeing that the streams application is running on a pod; however I’m
> getting the exception:
>
> org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> store, records, may have migrated to another instance.
>
>
>
> If I physically access the pod and check the Rocksdb folders I do not see
> the state store folder.  If I check the keys in the totals state store on
> this pod, I can find the key in the records state store on another pod. I
> had assumed that because the key of the events are the same, the same
> partition would be used for the two branches and therefore the same keys in
> these two state store would be created on the same Kubernetes pod.  This is
> not an issue for the Kafka stream, but that assumption was used in the way
> the state stores are read.  I assumed if I found the key in the 'totals'
> state store, the same key would be found on the same pod in the 'records'
> state store.
>
>
>
> The questions I have are:
>
> 1) Is it expected that the state stores can hold the partition data on
> different pods, and is this unique to streams using branch?
>
> 2) Is there a way to know if the state store is on the pod to avoid
> handling this as an exception?
>
>
>
> Here is the topology of the stream in question:
>
> KStream[] branches = stream
>
> .peek(receivingEventLogger)
>
> .selectKey(keyMapper)
>
> .mapValues(totalsValueMapper)
>
> .filter(nullKeyValueEventFilter)
>
> .branch((k, v) -> (RecordStatus.CLOSED.name
> ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
>
> || RecordStatus.LB_RDELETE.name
> ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
>
> (k, v) -> true);
>
>
>
> // CLOSED and LB_RDELETE branch writes to records state store
>
> branches[0]
>
> .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>
> .aggregate(totalsInitializer, totalsAggregator,
> materializedRecords)
>
> .toStream()
>
> .map(totalsInternalKeyValueMapper)
>
> .filter(nullKeyStringValueEventFilter)
>
> .to(loopbackTopic.name());
>
>
>
> // DEFAULT branch writes to totals state store
>
> branches[1]
>
> .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>
> .aggregate(totalsInitializer, totalsAggregator,
> materializedTotals)
>
> .toStream()
>
> .flatMap(totalsKeyValueMapper)
>
> .filter(nullKeyStringValueEventFilter)
>
> .peek(sendingEventLogger)
>
> .to(toTopic.name());


Active <> Active MirrorMaker2 setup via dedicated Kafka Connect cluster

2022-11-23 Thread Sriram Ganesh
Hi,

I am trying to set up active <> active mm2 via Kafka connect distributed
cluster. It seems not possible because of the limitations like
*bootstrap.servers *property.
And also as per this KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-RunningMirrorMakerinaConnectcluster,
it
is not possible to have sink connector. It means only one way is possible.

I have already tried other ways. Wanted to try this out using Kafka connect
distributed cluster setup.

Please kindly help me if I am doing anything wrong here and also throw some
light on how to set up active <> active mm2 via Kafka connect cluster if it
is possible. I really appreciate any help you can provide.


-- 
*Sriram G*
*Tech*


same keys appearing in state stores on different pods when using branches in kafka streams

2022-11-23 Thread Pushkar Deole
Hi All,

I have a stream application that creates 2 branches.  Each branch includes
a state store where the status field of the kafka message determines the
branch, and therefore the state store used:

Status OPEN = State store name totals

Status CLOSED = State store name records



I’m seeing that the streams application is running on a pod; however I’m
getting the exception:

org.apache.kafka.streams.errors.InvalidStateStoreException: The state
store, records, may have migrated to another instance.



If I physically access the pod and check the Rocksdb folders I do not see
the state store folder.  If I check the keys in the totals state store on
this pod, I can find the key in the records state store on another pod. I
had assumed that because the key of the events are the same, the same
partition would be used for the two branches and therefore the same keys in
these two state store would be created on the same Kubernetes pod.  This is
not an issue for the Kafka stream, but that assumption was used in the way
the state stores are read.  I assumed if I found the key in the 'totals'
state store, the same key would be found on the same pod in the 'records'
state store.



The questions I have are:

1) Is it expected that the state stores can hold the partition data on
different pods, and is this unique to streams using branch?

2) Is there a way to know if the state store is on the pod to avoid
handling this as an exception?



Here is the topology of the stream in question:

KStream[] branches = stream

.peek(receivingEventLogger)

.selectKey(keyMapper)

.mapValues(totalsValueMapper)

.filter(nullKeyValueEventFilter)

.branch((k, v) -> (RecordStatus.CLOSED.name
().equalsIgnoreCase(v.getCurrent().getRecordStatus())

|| RecordStatus.LB_RDELETE.name
().equalsIgnoreCase(v.getCurrent().getRecordStatus())),

(k, v) -> true);



// CLOSED and LB_RDELETE branch writes to records state store

branches[0]

.groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))

.aggregate(totalsInitializer, totalsAggregator,
materializedRecords)

.toStream()

.map(totalsInternalKeyValueMapper)

.filter(nullKeyStringValueEventFilter)

.to(loopbackTopic.name());



// DEFAULT branch writes to totals state store

branches[1]

.groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))

.aggregate(totalsInitializer, totalsAggregator,
materializedTotals)

.toStream()

.flatMap(totalsKeyValueMapper)

.filter(nullKeyStringValueEventFilter)

.peek(sendingEventLogger)

.to(toTopic.name());