[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885707#comment-16885707
 ] 

Matthias J. Sax commented on KAFKA-8650:


Splitting out initialization is not easy. The problem is that one might start 
multiple instances at once, but only one instance should do the initialization. 
Furthermore, processing can only start after the initialization is completed. 
Atm, we rely on consumer rebalance protocol to ensure this property: the group 
leader will do the initialization. If we separate both, there are two problems 
if you start multiple instances at the same time: (1) which instance should do 
the initialization. (2) How do other instances know that the initialization is 
finished and that they can start processing?

Maybe, it would be possible to enhance the reset-tool though: it might be 
possible to hand in the `StreamsBuilder` and the reset-tool could setup the 
topics accordingly (may open question raises though; it's just a rough idea).

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-15 Thread Raman Gupta (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885640#comment-16885640
 ] 

Raman Gupta commented on KAFKA-8650:


There is also a related API change / improvement that I believe should be in 
the KIP if one is created.

Currently, a new (or reset) stream initializes itself when "start" is called 
i.e. "start" is overloaded to mean "initialize and start".

If "initialize" was possible without start, then it would be easy to call 
"initialize" to create the stream's topology, then use the application reset 
tool to set the desired offsets, then use "start" to actually start the stream 
at the desired place. This would allow the user to carry out this process 
without actually relying on the system to throw an exception at stream startup 
time, post-init without an offset.

This can also be done in advance of allowing auto.offset.reset = none for 
streams, which, when done, would just become a safety on the stream.start 
trigger.

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885435#comment-16885435
 ] 

Matthias J. Sax commented on KAFKA-8650:


It's a first time report / request. Hence, it does not seem to be a problem for 
many users atm. Hard to judge. – Also, there is a workaround (even if not easy 
to apply).

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884826#comment-16884826
 ] 

Richard Yu commented on KAFKA-8650:
---

[~mjsax] Do you think this ticket is something that Kafka commuity would need 
within the next release? Would be good to get a gauge on this issue's priority.

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none

2019-07-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882307#comment-16882307
 ] 

Matthias J. Sax commented on KAFKA-8650:


Thanks for creating this ticket. Sounds reasonable to me. I marked it as 
"need-kip" because if we add `none` as new options it's technically an API 
change. But maybe it's trivial enough to just skip the KIP process for it.

For option (2), it's actually an interesting question and will be hard to get 
right. You only want to set offsets to zero iff those topics are newly created. 
Hence, topic creation and committing corresponding offsets for the 
application.id would need to be atomic. (We would need to change the create 
topic request what also requires a KIP; as an alternative, it might be possible 
to first commit offsets, and create those topics afterwards). For EOS, if you 
loose the repartition offsets (eg, because you application was offline for a 
long time), it could also be an error condition on which you want to stop. 
Hence, using rest policy earliest/latest might actually violate EOS, too.

On the other hand, it might be sufficient to just use `earliest` for 
repartition topics, because if you loose offsets, you would loose them not just 
for the repartition topics but also for the input topic. For this case, I would 
assume that a clean reset of the application to new start offsets for input 
topics, would require to wipe out the repartition topics anyway?

Another idea would be, to not change anything in KafkaStreams, but enhance the 
reset-tool to actually commit offsets zero for repartition topics after they 
were deleted (need to double check if it's possible to commit offsets for a 
non-existing topic).

> Streams does not work as expected with auto.offset.reset=none
> -
>
> Key: KAFKA-8650
> URL: https://issues.apache.org/jira/browse/KAFKA-8650
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Priority: Major
>  Labels: needs-kip
>
> The auto.offset.reset policy of none is useful as a safety measure, 
> especially when 
> * exactly-once processing is desired, or
> * at-least-once is desired, but it is expensive to reprocess from the 
> beginning.
> In this case, using "none" forces the ops team to explicitly set the offset 
> before the stream can re-start processing, in the (hopefully rare) situations 
> in which the stream consumer offset has been lost for some reason, or in the 
> case of a new stream that should not start processing from the beginning or 
> the end, but somewhere in the middle (this scenario might occur during topic 
> migrations).
> Kafka streams really only supports auto.offset.reset of earliest or latest 
> (see the `Topology.AutoOffsetReset` enum). It is also possible to use the 
> auto.offset.reset configuration value, but this works suboptimally because if 
> the streams application reset tool is used (even with a specific offset 
> specified), the offset is set for the input topic, but it is not, and cannot 
> be, set for the internal topics, which won't exist yet.
> The internal topics are created by Kafka streams at startup time, but because 
> the auto.offset.reset policy of "none" is passed to the consumer of those 
> internal topics, the Kafka stream fails to start with a 
> "NoOffsetForPartitionException".
> Proposals / options:
> 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it 
> affects the input topics, but not the internal topics.
> 2) Allow streams to be configured with auto.offset.reset=none, but explicitly 
> set the offset to 0 for newly created internal topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)