[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-04-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092420#comment-17092420
 ] 

Guozhang Wang commented on KAFKA-9909:
--

Hello [~gopikrishna.chaga...@target.com] If you want to control "when" to 
commit, you can set the streams commit interval to infinity and use 
`context.commit` manually when you want to.

If you want to control "what" to commit (I'm not sure if that the case for 
you), I think it is a bit risky to do so because it may have vulnerability to 
many semantical guarantees in stream processing. Do you have any specific 
scenarios for it?

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-04-26 Thread Gopikrishna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093034#comment-17093034
 ] 

Gopikrishna commented on KAFKA-9909:


hello [~guozhang] thank you for your response. I have tried setting 
commit.interval.ms to

Long.MAX_VALUE, and did not commit manually. But i could see that kafka streams 
committed offset automatically when i skipped specific offsets intentionally. 
Is there any example how to skip the offset by not committing thru kafka 
streams when api could not process the message? appreciate your help.

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-04-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093757#comment-17093757
 ] 

Guozhang Wang commented on KAFKA-9909:
--

Not sure I can follow here: if you set the config to Long.MAX_VALUE and did not 
commit manually, then streams should NOT commit anything until it was closed, 
or when a rebalance is triggered.

What did you mean by "skip specific offsets intentionally"? If you could not 
process certain messages because it is e.g. ill-formatted, or is simply a 
poison pill, the general solution here is to send it to some poison pill queue 
for book-keeping.

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-05 Thread Gopikrishna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099806#comment-17099806
 ] 

Gopikrishna commented on KAFKA-9909:


i can understand on rebalance, but you mentioned "streams should NOT commit 
anything until it was closed". Not clear on what was "it" closed, did you mean 
that application is closed? if you mean that way, if the application closes due 
to an error, will it commit the offset?

I handled DeserializationExceptionHandler if any ill-formatted message is 
received, i meant skipping offsets to avoid context.commit() on messages that 
cannot be processed during that time. 

With traditional kafka consumer, i can acknowledge the offset, but i dont have 
an option not to acknowledge except avoiding context.commit(). 

*here is the code snippet to explain the scenario, i am talking about:* 

public class CustomProcessor implements Processor {
 private ProcessorContext context;

@Override
public void init(ProcessorContext context) {
 this.context = context;

}

@Override
public void process(String key, String value) {
/* i am throwing a runtime exception to come out of process method without 
explicitly committing the offset. 

*/
 try {
 if (value.contains("hello")) {
 System.out.println("Skipping offset : "+context.offset());
 throw new RuntimeException("Hello raising exception!");
 }
 System.out.println("offset : "+context.offset()+ " partition : 
"+context.partition());
context.commit();
 }catch (Exception e)
 {
 System.out.println("Log & Continue exception: "+e.getMessage());
 }
}

}

once the process method completes with exception (without context.commit()), 
the offset is still committed. 

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100186#comment-17100186
 ] 

Guozhang Wang commented on KAFKA-9909:
--

I think the reason here is that you catch the exception and continue, i.e. 
let's say you have three records {{a,b,c}} with offset {{1,2,3}} where {{b}} is 
ill-formatted:

1. {{a}} is processed normally, and then committed.
2. {{b}} found ill-formatted, and then skipped.
3. {{c}} is still processed normally, and then committed. Now we would commit 
up to {{3}} which would include {{b}}'s offset {{2}}.

Even in consumer, we do not support things like "offset 1 and 3 are committed, 
but offset 2 is skipped". I.e. if you do not want to commit offset {{2}}, you'd 
have to either send the record to a queue to bookkeep it, or just stop the app 
immediately and do not continue to process and commit {{c}}. You can read this 
section 
https://docs.confluent.io/current/streams/faq.html#failure-and-exception-handling
 for some more suggestions.



> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-11 Thread Gopikrishna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104526#comment-17104526
 ] 

Gopikrishna commented on KAFKA-9909:


Thank you [~guozhang] this makes sense. actually i was expecting some 
checkpoints to be enabled by kafka streams. it might help when failures occur 
and will give ability to resume from there over and ensure everything is 
processed successfully. The stream applications will self heal and process 
everything that is eligible to execute. But i am fine to close this issue for 
now. 

let me know your thoughts. 

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-11 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104937#comment-17104937
 ] 

Guozhang Wang commented on KAFKA-9909:
--

[~gopikrishna.chaga...@target.com] We do have future plans to enable 
checkpointing (potentially to remote storage as well), but to your use cases, I 
think it still boils down to whether the error is retriable: if the message 
itself is ill-formatted (like shown in your example code) then simply 
restarting the process and hoping it would self-heal would not make sense. 
Instead some manual intervention is still needed, sth like: after checkpointed 
at offset 1, restart at offset 3 (i.e. skipping offset 2 since we know it is a 
poison pill), by doing that the record at offset 2 would NOT be processed and 
users may optionally require the producer to re-send the record etc.

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-13 Thread Gopikrishna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106198#comment-17106198
 ] 

Gopikrishna commented on KAFKA-9909:


Appreciate [~guozhang] your response. There are two cases we are talking about.
 # ill-formatted message: this can be handled very easily and pretty straight 
forward. my example code just shown to replicate the issue but not having the 
intention. 
 # Message received correctly, its good. but the application could not process 
it due to various other dependencies on other DB/microservices. will retry 
multiple times and also will process later by storing the partition and offset 
details. that is the process i follow. 

 

But checkpointing will help the second scenario as mentioned above. the code 
shared will illustrate the first scenario but for second scenario also similar 
to that as i am unable to the message currently, hence i want to have a 
checkpoint to revisit later. 

yes i agree, we can resend the message again if it is not processed, but 
ideally it increases the count, and will be difficult to know the accurate 
number of messages processed perfectly. Hope this is clear. 

 

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)