Hi Sathya,
MyProcessor does not have access to MySource, because in MySource you
just build the topology that is then executed by Kafka Streams. So you
cannot send anything to MySource, because MyProcessor does not know
anything about MySource.
If you want to stop consumption upon an exception from your service, you
throw that exception in process(). That would stop the stream thread on
which the processor is executed. Other running stream threads on the
same client and other Streams clients in your Streams application are
not affected by this exception. If you want to shutdown the Streams
client on which the stream thread that throws the exception runs you
need pass a reference of your Streams client (i.e., a reference to the
KafkaStreams object) to the uncaught exception handler that you can set
with KafkaStreams#setUncaughtExceptionHandler() and in the uncaught
exception handler you need to call KafkaStreams#close(Duration.ZERO).
Make sure you call close() with Duration.ZERO since otherwise you might
run into a deadlock.
We are currently developing a more sophisticated way to react on
exceptions that would also allow you to shutdown your whole Streams
application (i.e. close all KafkaStreams objects) upon an exception. See
more details under https://cwiki.apache.org/confluence/x/lkN4CQ
Best,
Bruno
On 09.01.21 10:41, Sathya Murthy wrote:
Hi there
i m sathya,
i have below requirements in my project , please let me know how to
achieve this requirement.
These are my two kafka stream classes
1. MySource
2. MyProcessor
and Mysource class sends continues stream of data and retrieved in process
method of Myprocessor class.
My requirements are
1) When my each message is processed inside process method, I need to send
response back to MySource class.(either SUCCESS/FAILED)
2) When it unsuccessful like any exception thrown while invoking service
call (newApplication.service(value);)
The process method should stop consume any messages further to prevent data
loss.
could you please help me on this.
1) MySource class
Kstreambuilder .build ().addSource (READ_FROM_TOPIC,
Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages)
.addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)
2) MyProcessor class
Public class MyProcessor implements Processor<String,String>{
Public void process (String key,String value){
Try{
newApplication.service(value);
} catch (exception e){
}
}