[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099806#comment-17099806
 ] 

Gopikrishna commented on KAFKA-9909:
------------------------------------

i can understand on rebalance, but you mentioned "streams should NOT commit 
anything until it was closed". Not clear on what was "it" closed, did you mean 
that application is closed? if you mean that way, if the application closes due 
to an error, will it commit the offset?

I handled DeserializationExceptionHandler if any ill-formatted message is 
received, i meant skipping offsets to avoid context.commit() on messages that 
cannot be processed during that time. 

With traditional kafka consumer, i can acknowledge the offset, but i dont have 
an option not to acknowledge except avoiding context.commit(). 

*here is the code snippet to explain the scenario, i am talking about:* 

public class CustomProcessor implements Processor<String, String> {
 private ProcessorContext context;

@Override
public void init(ProcessorContext context) {
 this.context = context;

}

@Override
public void process(String key, String value) {
/* i am throwing a runtime exception to come out of process method without 
explicitly committing the offset. 

*/
 try {
 if (value.contains("hello")) {
 System.out.println("Skipping offset : "+context.offset());
 throw new RuntimeException("Hello raising exception!");
 }
 System.out.println("offset : "+context.offset()+ " partition : 
"+context.partition());
context.commit();
 }catch (Exception e)
 {
 System.out.println("Log & Continue exception: "+e.getMessage());
 }
}

}

once the process method completes with exception (without context.commit()), 
the offset is still committed. 

> Kafka Streams : offset control to Streams API
> ---------------------------------------------
>
>                 Key: KAFKA-9909
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9909
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.5.0
>         Environment: All
>            Reporter: Gopikrishna
>            Priority: Minor
>              Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to