Hi Sathya,

MyProcessor does not have access to MySource, because in MySource you just build the topology that is then executed by Kafka Streams. So you cannot send anything to MySource, because MyProcessor does not know anything about MySource.

If you want to stop consumption upon an exception from your service, you throw that exception in process(). That would stop the stream thread on which the processor is executed. Other running stream threads on the same client and other Streams clients in your Streams application are not affected by this exception. If you want to shutdown the Streams client on which the stream thread that throws the exception runs you need pass a reference of your Streams client (i.e., a reference to the KafkaStreams object) to the uncaught exception handler that you can set with KafkaStreams#setUncaughtExceptionHandler() and in the uncaught exception handler you need to call KafkaStreams#close(Duration.ZERO). Make sure you call close() with Duration.ZERO since otherwise you might run into a deadlock.

We are currently developing a more sophisticated way to react on exceptions that would also allow you to shutdown your whole Streams application (i.e. close all KafkaStreams objects) upon an exception. See more details under https://cwiki.apache.org/confluence/x/lkN4CQ

Best,
Bruno



On 09.01.21 10:41, Sathya Murthy wrote:
Hi  there
i m sathya,
i have below requirements in my project , please let me know how to
achieve this requirement.


These are my two kafka stream classes

1. MySource

2. MyProcessor

and Mysource class sends continues stream of data and retrieved in process
method of Myprocessor class.

My requirements are

1) When my each message is processed inside process method, I need to send
response back to MySource class.(either SUCCESS/FAILED)

2) When it unsuccessful like any exception thrown while invoking service
call (newApplication.service(value);)

The process method should stop consume any messages further to prevent data
loss.

could you please help me on this.

1) MySource class

Kstreambuilder .build ().addSource (READ_FROM_TOPIC,
Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages)

.addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)

2) MyProcessor class

Public class MyProcessor implements Processor<String,String>{

Public void process (String key,String value){

Try{

newApplication.service(value);

} catch (exception e){

}

}

Reply via email to