[GitHub] [kafka] cmccabe merged pull request #11313: MINOR: GroupMetadataManager#shutdown should remove metrics

2021-09-10 Thread GitBox


cmccabe merged pull request #11313:
URL: https://github.com/apache/kafka/pull/11313


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-10 Thread GitBox


cmccabe commented on a change in pull request #11312:
URL: https://github.com/apache/kafka/pull/11312#discussion_r706509007



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1385,10 +1385,22 @@ object KafkaConfig {
 }
 if (maybeSensitive) Password.HIDDEN else value
   }
+
+  def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
+val output = new util.HashMap[Any, Any](input)
+val brokerId = output.get(KafkaConfig.BrokerIdProp)
+val nodeId = output.get(KafkaConfig.NodeIdProp)
+if (brokerId == null && nodeId != null) {
+  output.put(KafkaConfig.BrokerIdProp, nodeId)
+} else if (brokerId != null && nodeId == null) {
+  output.put(KafkaConfig.NodeIdProp, brokerId)
+}
+output
+  }
 }
 
 class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, 
dynamicConfigOverride: Option[DynamicBrokerConfig])
-  extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
+  extends AbstractConfig(KafkaConfig.configDef, 
KafkaConfig.populateSynonyms(props), doLog) with Logging {

Review comment:
   Yes, good find. This is a case where Scala's behavior is kind of 
annoying. I wish there was a way to opt-out of the auto-initialization.
   
   Anyway, I made the primary constructor private, and put a call to 
`KafkaConfig#populateSynonyms` in all the (publicly visible) secondary 
constructors. This should fix it...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-10 Thread GitBox


cmccabe commented on a change in pull request #11312:
URL: https://github.com/apache/kafka/pull/11312#discussion_r706507377



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1385,10 +1385,22 @@ object KafkaConfig {
 }
 if (maybeSensitive) Password.HIDDEN else value
   }
+
+  def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {

Review comment:
   Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-10 Thread GitBox


cmccabe commented on a change in pull request #11312:
URL: https://github.com/apache/kafka/pull/11312#discussion_r706506964



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1385,10 +1385,22 @@ object KafkaConfig {
 }
 if (maybeSensitive) Password.HIDDEN else value
   }
+
+  def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
+val output = new util.HashMap[Any, Any](input)

Review comment:
   AbstractConfig already copies the map which is passed to it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election

2021-09-10 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413424#comment-17413424
 ] 

Jose Armando Garcia Sancio commented on KAFKA-7408:
---

[~hachikuji] can you clarify what you mean by "What we can do is let the newly 
elected leader truncate to the LSO and then rewrite all the markers that 
followed it using its own leader epoch (to avoid divergence from followers)"?
 # Does that mean that the leader will rewrite all {{WriteTxnMarker}} from the 
{{LSO}} to the {{LEO}}? If so does it rewrite them using the original {{ABORT}} 
or {{COMMIT}} marker?
 # Does it mean that the unclean leader will {{ABORT}} all pending transactions?

> Truncate to LSO on unclean leader election
> --
>
> Key: KAFKA-7408
> URL: https://issues.apache.org/jira/browse/KAFKA-7408
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> If an unclean leader is elected, we may lose committed transaction data. That 
> alone is expected, but what is worse is that a transaction which was 
> previously completed (either committed or aborted) may lose its marker and 
> become dangling. The transaction coordinator will not know about the unclean 
> leader election, so will not know to resend the transaction markers. 
> Consumers with read_committed isolation will be stuck because the LSO cannot 
> advance.
> To keep this scenario from occurring, it would be better to have the unclean 
> leader truncate to the LSO so that there are no dangling transactions. 
> Truncating to the LSO is not alone sufficient because the markers which 
> allowed the LSO advancement may be at higher offsets. What we can do is let 
> the newly elected leader truncate to the LSO and then rewrite all the markers 
> that followed it using its own leader epoch (to avoid divergence from 
> followers).
> The interesting cases when an unclean leader election occurs are are when a 
> transaction is ongoing. 
> 1. If a producer is in the middle of a transaction commit, then the 
> coordinator may still attempt to write transaction markers. This will either 
> succeed or fail depending on the producer epoch in the unclean leader. If the 
> epoch matches, then the WriteTxnMarker call will succeed, which will simply 
> be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker 
> call will fail and the transaction coordinator can potentially remove the 
> partition from the transaction.
> 2. If a producer is still writing the transaction, then what happens depends 
> on the producer state in the unclean leader. If no producer state has been 
> lost, then the transaction can continue without impact. Otherwise, the 
> producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will 
> cause the transaction to be aborted by the coordinator. That takes us back to 
> the first case.
> By truncating the LSO, we ensure that transactions are either preserved in 
> whole or they are removed from the log in whole. For an unclean leader 
> election, that's probably as good as we can do. But we are ensured that 
> consumers will not be blocked by dangling transactions. The only remaining 
> situation where a dangling transaction might be left is if one of the 
> transaction state partitions has an unclean leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-10 Thread GitBox


cmccabe opened a new pull request #11320:
URL: https://github.com/apache/kafka/pull/11320


   The ReplicaManager, LogManager, and KafkaApis class all have many
   constructor parameters. It is often difficult to add or remove a
   parameter, since there are so many locations that need to be updated. In
   order to address this problem, we should use named parameters when
   constructing these objects from Scala code. This will make it easy to
   add new optional parameters without modifying many test cases.  It will
   also make it easier to read git diffs and PRs, since the parameters will
   have names next to them. Since Java does not support named paramters,
   this PR adds several Builder classes which can be used to achieve the
   same effect.
   
   ReplicaManager also had a secondary constructor, which this PR removes.
   The function of the secondary constructor was just to provide some
   default parameters for the main constructor. However, it is simpler to just
   use default parameters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-10 Thread GitBox


hachikuji commented on a change in pull request #11312:
URL: https://github.com/apache/kafka/pull/11312#discussion_r706502119



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1385,10 +1385,22 @@ object KafkaConfig {
 }
 if (maybeSensitive) Password.HIDDEN else value
   }
+
+  def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
+val output = new util.HashMap[Any, Any](input)
+val brokerId = output.get(KafkaConfig.BrokerIdProp)
+val nodeId = output.get(KafkaConfig.NodeIdProp)
+if (brokerId == null && nodeId != null) {
+  output.put(KafkaConfig.BrokerIdProp, nodeId)
+} else if (brokerId != null && nodeId == null) {
+  output.put(KafkaConfig.NodeIdProp, brokerId)
+}
+output
+  }
 }
 
 class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, 
dynamicConfigOverride: Option[DynamicBrokerConfig])
-  extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
+  extends AbstractConfig(KafkaConfig.configDef, 
KafkaConfig.populateSynonyms(props), doLog) with Logging {

Review comment:
   It seems a little bit odd that we populate synonyms only in the 
reference that we're passing to `AbstractConfig`. The field `KafkaConfig.props` 
could still be accessed directly (maybe it should be private?). Would it make 
sense to move this to a factory method? For example:
   
   ```scala
   object KafkaConfig {
 def apply(props: java.util.Map[_, _], doLog: Boolean, 
dynamicConfigOverride: Option[DynamicBrokerConfig]): KafkaConfig = {
   new KafkaConfig(populateSynonyms(props), doLog, dynamicConfigOverride)
 }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8116) Add Kafka Streams archetype for Java11

2021-09-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8116:
---
Fix Version/s: 4.0.0

> Add Kafka Streams archetype for Java11
> --
>
> Key: KAFKA-8116
> URL: https://issues.apache.org/jira/browse/KAFKA-8116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-5727 we added an archetype for 
> Kafka Streams. However, this archetype only works for Java8 but not for 
> Java11. Thus, we should add a new archetype project for Java11.
> This ticket requires a KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2021-09-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413408#comment-17413408
 ] 

Matthias J. Sax commented on KAFKA-13289:
-

I guess that the repartition topic could be the "issue", because there is no 
guarantee about order there. Setting a new key upstream means, that you get 
interleaved writes into the repartition topic from different upstream producers 
(as records from two different input topic partition can map to the same join 
key).

In a life run when throughput is lower, the issue might be mitigate. But if you 
reprocess old data, more unorder might occur in the repartition topic and thus 
you would need a larger grace period downstream.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Major
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records 

[jira] [Commented] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming

2021-09-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413405#comment-17413405
 ] 

Matthias J. Sax commented on KAFKA-13290:
-

Your observation is behavior by-design. You are using `suppress` with 
"until-window-close" strategy. A window closes only if stream-time advanced 
beyond window-end plus grace-period. Note that stream-time is data driven, ie, 
if no new data arrives, stream-time cannot advance (it's independent for 
wall-clock time). Closing this as "not a problem". There is a ticket that 
request a "wall-clock time" emit strategy for suppress() already 
https://issues.apache.org/jira/browse/KAFKA-7748 in case you want to follow up 
there. – Of course, feel free to follow up on this ticket if you have further 
questions.

> My timeWindows last aggregated message never emit until a new message coming 
> -
>
> Key: KAFKA-13290
> URL: https://issues.apache.org/jira/browse/KAFKA-13290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
> Environment: Development
>Reporter: Steve Zhou
>Priority: Major
>
> I have a Kafka stream event processing code which aggregates 1 minutes data. 
> It works as expected if data comes continuously, 
> If we stop producer, then i found the last aggregated message does not emit 
> until new message coming.
>  
> Following is my sample code, @Bean
>  public KStream kStream(StreamsBuilder 
> streamBuilder) {
>   KStream aggregatedData = streamBuilder
>                .stream(dataTopic, dataConsumed)
>                .groupByKey(Grouped.with(
>                              stringSerde,
>                              aggregateValueSerde))
>                
> .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
>               .aggregate(this::initialize, this::aggregateFields,
>                                materializedAsWindowStore(windowedStoreName, 
> stringSerde,
>                                AggregateMetricsFieldsSerde))
>             
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
>             .withName(windowedSuppressNodeName))
>              .toStream().map((key, aggregateMetrics) ->
> {            return KeyValue.pair(key.key(), aggregateMetrics);    }
> );
>  aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
>  return aggregatedFlowData;
>  }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming

2021-09-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13290.
-
Resolution: Not A Problem

> My timeWindows last aggregated message never emit until a new message coming 
> -
>
> Key: KAFKA-13290
> URL: https://issues.apache.org/jira/browse/KAFKA-13290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
> Environment: Development
>Reporter: Steve Zhou
>Priority: Major
>
> I have a Kafka stream event processing code which aggregates 1 minutes data. 
> It works as expected if data comes continuously, 
> If we stop producer, then i found the last aggregated message does not emit 
> until new message coming.
>  
> Following is my sample code, @Bean
>  public KStream kStream(StreamsBuilder 
> streamBuilder) {
>   KStream aggregatedData = streamBuilder
>                .stream(dataTopic, dataConsumed)
>                .groupByKey(Grouped.with(
>                              stringSerde,
>                              aggregateValueSerde))
>                
> .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
>               .aggregate(this::initialize, this::aggregateFields,
>                                materializedAsWindowStore(windowedStoreName, 
> stringSerde,
>                                AggregateMetricsFieldsSerde))
>             
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
>             .withName(windowedSuppressNodeName))
>              .toStream().map((key, aggregateMetrics) ->
> {            return KeyValue.pair(key.key(), aggregateMetrics);    }
> );
>  aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
>  return aggregatedFlowData;
>  }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics

2021-09-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13288.
-
Resolution: Fixed

> Transaction find-hanging command with --broker-id excludes internal topics
> --
>
> Key: KAFKA-13288
> URL: https://issues.apache.org/jira/browse/KAFKA-13288
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is 
> specified. By default, this excludes internal topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #11319: KAFKA-13288; Include internal topics when searching hanging transactions

2021-09-10 Thread GitBox


hachikuji merged pull request #11319:
URL: https://github.com/apache/kafka/pull/11319


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10338) Support PEM format for SSL certificates and private key

2021-09-10 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413395#comment-17413395
 ] 

Rajini Sivaram edited comment on KAFKA-10338 at 9/10/21, 9:27 PM:
--

[~teabot] We currently don't have a way of reconfiguring PEM configs for 
clients unless they are stored externally in a file and the file is reloaded. 
It may be possible to add a custom `ssl.engine.factory.class` that does 
reconfiguration for clients. For brokers, we can use standard dynamic broker 
configs for PEM.


was (Author: rsivaram):
[~teabot] We currently don't have a way of updating PEM configs for clients 
unless they are stored externally in a file and the file is reloaded. It may be 
possible to add a custom `ssl.engine.factory.class` that does reconfiguration 
for clients. For brokers, we can use standard dynamic broker configs for PEM.

> Support PEM format for SSL certificates and private key
> ---
>
> Key: KAFKA-10338
> URL: https://issues.apache.org/jira/browse/KAFKA-10338
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> We currently support only file-based JKS/PKCS12 format for SSL key stores and 
> trust stores. It will be good to add support for PEM as configuration values 
> that fits better with config externalization.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10338) Support PEM format for SSL certificates and private key

2021-09-10 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413395#comment-17413395
 ] 

Rajini Sivaram commented on KAFKA-10338:


[~teabot] We currently don't have a way of updating PEM configs for clients 
unless they are stored externally in a file and the file is reloaded. It may be 
possible to add a custom `ssl.engine.factory.class` that does reconfiguration 
for clients. For brokers, we can use standard dynamic broker configs for PEM.

> Support PEM format for SSL certificates and private key
> ---
>
> Key: KAFKA-10338
> URL: https://issues.apache.org/jira/browse/KAFKA-10338
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> We currently support only file-based JKS/PKCS12 format for SSL key stores and 
> trust stores. It will be good to add support for PEM as configuration values 
> that fits better with config externalization.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-10 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r706478220



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   val cleanableLogs = dirtyLogs.filter { ltc =>
 (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
   }
+
   if(cleanableLogs.isEmpty) {
-None
+val logsWithTombstonesExpired = dirtyLogs.filter {
+  case ltc => 
+// in this case, we are probably in a low throughput situation
+// therefore, we should take advantage of this fact and remove 
tombstones if we can
+// under the condition that the log's latest delete horizon is 
less than the current time
+// tracked
+ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
   @junrao @hachikuji @lbradstreet I've removed the logic for tracking the 
latestDeleteHorizon and the deleteHorizon-triggered cleaning in 
grabFilthiestCompactedLog since this part of the PR is not a part of KIP-534




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-10 Thread GitBox


mumrah commented on a change in pull request #11312:
URL: https://github.com/apache/kafka/pull/11312#discussion_r706419226



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1385,10 +1385,22 @@ object KafkaConfig {
 }
 if (maybeSensitive) Password.HIDDEN else value
   }
+
+  def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
+val output = new util.HashMap[Any, Any](input)

Review comment:
   Is it safe to make a new map here? Are there any callers of 
KafkaConfig() which might expect to modify the props map after the object has 
been constructed?

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1385,10 +1385,22 @@ object KafkaConfig {
 }
 if (maybeSensitive) Password.HIDDEN else value
   }
+
+  def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {

Review comment:
   A comment or docstring would be useful here explaining the motivation 
for this method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413335#comment-17413335
 ] 

Guozhang Wang commented on KAFKA-13272:
---

I looked into the source code and I have a suspicion it is a broker side bug. 
Here's my theory:

* In the transaction coordinator, when we want to send markers to the data 
partition hosts, there's a condition that if the leader of that data partition 
is unknown, then we would skip sending this marker since the data partition is 
likely deleted 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala#L372).
* When one coordinator is down, other brokers would take over as the 
coordinator and usually have the up-to-date metadata, hence would still proceed 
to send the markers.
* However when the whole cluster is down ungracefully, upon restarting the 
newly elected coordinators are less likely to have the up-to-date metadata, 
while reading the txn logs and trying to complete the transactions. If it hits 
the above condition, it would skip sending the markers and note that they would 
never be retried.
* So from the newly elected coordinator's point of view, the txn has been 
completed while they are not actually.


If this is indeed the bug, it would be a bit tricky to fix. Maybe one way is to 
only start loading the txn markers after metadata is refreshed upon starting? 
With the KRaft, this should not happen in the new world since the brokers can 
easily tell whether their metadata is stale or not.
[~hachikuji] WDYT?

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO 

[GitHub] [kafka] cmccabe commented on a change in pull request #11313: MINOR: GroupMetadataManager#shutdown should remove metrics

2021-09-10 Thread GitBox


cmccabe commented on a change in pull request #11313:
URL: https://github.com/apache/kafka/pull/11313#discussion_r706387699



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -977,6 +977,9 @@ class GroupMetadataManager(brokerId: Int,
 shuttingDown.set(true)
 if (scheduler.isStarted)
   scheduler.shutdown()
+metrics.removeSensor(GroupMetadataManager.LoadTimeSensor)
+metrics.removeSensor("OffsetCommits")

Review comment:
   good idea. will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #11319: KAFKA-13288; Include internal topics when searching hanging transactions

2021-09-10 Thread GitBox


hachikuji opened a new pull request #11319:
URL: https://github.com/apache/kafka/pull/11319


   This patch ensures that internal topics are included when searching for 
hanging transactions with the `--broker-id` argument in `kafka-transactions.sh`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-10 Thread GitBox


guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-917118246


   Hi @vamossagar12 Thanks again for your patience! I know it has been dragging 
a bit long, and I had some discussions with @cadonna and @vamossagar12 
regarding how we can move forward to eventually merge-in your work to Apache 
Kafka. As a result of that I created this ticket trying to summarize what we 
want to do (which would incorporate this JIRA ticket):
   
   https://issues.apache.org/jira/browse/KAFKA-13286
   
   At the moment, our main concerns are that:
   
   1) Direct bytebuffers, which is relied on native memory, is not fully 
managed by the JVM (GC) and hence we need to be very careful about its 
side-effects regarding memory pressures.
   2) The current implementation without the API changes on serdes has a 
relatively small improvement on throughput, whereas in order to use the 
byte-buffer without API changes, we also would need to "guess" the allocated 
size before serialization.
   
   So we think it's probably better to consider tackling the larger scope of 
https://issues.apache.org/jira/browse/KAFKA-13286 as a whole, which would mean 
we would hold on merging this PR after we have the serde / byte-buffer supplier 
works in place. Does that sound reasonable to you? Would love to hear your 
thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13286) Revisit Streams State Store and Serde Implementation

2021-09-10 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13286:
--
Description: 
Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series 
of serialize functions are called along side the state store hierarchies, we 
then just need to make sure that what's should be appended first to the 
ByteBuffer would be serialized first. E.g. if the serialized bytes format of a 
WindowSchema is 

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set 
of byte arrays that can be re-used. This can be captured as an intelligent 
{{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use 
existing ones in the pool; the gottcha though is that we may usually not know 
the serialized byte length for raw keys (think: in practice the keys would be 
in json/avro etc), and hence would not know how to pass in {{size}} for 
serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer 
back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

Some quick notes here regarding concurrency and sharing of the byte-buffer 
pools:

* For pull query handling threads, if we do not do any deserialization then we 
would not need to access the ByteBufferSuppliers, hence there's no concurrent 
access.
* For multiple streaming threads, my intention is to have each thread getting 
its own isolated byte-buffer pools to avoid any concurrency.

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use 
the allocated ByteBuffer via RocksDB'a direct API directly so that using them 
for puts/gets would not go through JNI, hence is more efficient. The Supplier 
then would need to be careful to deallocate these direct byte-buffers since 
they would not be GC'ed by the JVM.

There's a catch though with direct ByteBuffer that we'd need to be careful 
about: though direct byte buffer is also managed by JVM (and hence GC) like 
heap byte buffer, they are not managed in the same way as the latter [1][2] and 
seems to be more conservative. It was suggested that sometimes users need to 
manually deallocate them if GC did not work promptly. I think the most 
effective way is that we try very best to re-use allocated direct byte-buffer 
from native memory, that means we probably want to allocate conservatively 
large size (if we do not know the serialized length), so that they can be 
reused.

[1] 
https://www.fusion-reactor.com/blog/evangelism/understanding-java-buffer-pool-memory-space/#:~:text=A%20direct%20buffer%20is%20a,allocateDirect()%20factory%20method
[2] 
https://stackoverflow.com/questions/3496508/deallocating-direct-buffer-native-memory-in-java-for-jogl/26777380#26777380

  was:
Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially 

[jira] [Updated] (KAFKA-13286) Revisit Streams State Store and Serde Implementation

2021-09-10 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13286:
--
Description: 
Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series 
of serialize functions are called along side the state store hierarchies, we 
then just need to make sure that what's should be appended first to the 
ByteBuffer would be serialized first. E.g. if the serialized bytes format of a 
WindowSchema is 

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set 
of byte arrays that can be re-used. This can be captured as an intelligent 
{{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use 
existing ones in the pool; the gottcha though is that we may usually not know 
the serialized byte length for raw keys (think: in practice the keys would be 
in json/avro etc), and hence would not know how to pass in {{size}} for 
serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer 
back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

Some quick notes here regarding concurrency and sharing of the byte-buffer 
pools:

* For pull query handling threads, if we do not do any deserialization then we 
would not need to access the ByteBufferSuppliers, hence there's no concurrent 
access.
* For multiple streaming threads, my intention is to have each thread getting 
its own isolated byte-buffer pools to avoid any concurrency.

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use 
the allocated ByteBuffer via RocksDB'a direct API directly so that using them 
for puts/gets would not go through JNI, hence is more efficient. The Supplier 
then would need to be careful to deallocate these direct byte-buffers since 
they would not be GC'ed by the JVM.


  was:
Kafka Streams state store is built in hierarchical layers as metered -> cached 
-> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on 
the builtin Serde libraries for serialize / deserialize. There are several 
inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot 
of garbage and spend unnecessary time copying bytes, especially when working 
with windowed state stores that rely on composite keys. In many places in the 
code we have extract parts of the composite key to deserialize the either the 
timestamp or the message key from the state store key (e.g. the methods in 
WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store 
hierarchies, which means we need to extra byte array copies as we move along 
doing serdes. For example, we do serde in the metered layer, but then again in 
cached layer with cache functions, and also in logged stores for generated the 
key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from 
ByteBuffers would allow us to reuse the underlying bytearrays and just pass 
around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:


[jira] [Commented] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics

2021-09-10 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413278#comment-17413278
 ] 

Jason Gustafson commented on KAFKA-13288:
-

Note that it is still possible to use it with the --topic option instead. For 
example: 
{code}
kafka-transaction.sh --bootstrap-server localhost:9092 find-hanging --topic 
__consumer_offsets
{code}

> Transaction find-hanging command with --broker-id excludes internal topics
> --
>
> Key: KAFKA-13288
> URL: https://issues.apache.org/jira/browse/KAFKA-13288
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is 
> specified. By default, this excludes internal topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] teabot commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2021-09-10 Thread GitBox


teabot commented on pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#issuecomment-916994728


   How is PEM certificate renewal possible on the producer/consumer client? Is 
this documented anywhere?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10338) Support PEM format for SSL certificates and private key

2021-09-10 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413243#comment-17413243
 ] 

Elliot West edited comment on KAFKA-10338 at 9/10/21, 3:29 PM:
---

How is PEM certificate renewal possible on the producer/consumer client? Is 
this documented anywhere?


was (Author: teabot):
How is PEM certificate renewal possible on the producer/consumer client?

> Support PEM format for SSL certificates and private key
> ---
>
> Key: KAFKA-10338
> URL: https://issues.apache.org/jira/browse/KAFKA-10338
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> We currently support only file-based JKS/PKCS12 format for SSL key stores and 
> trust stores. It will be good to add support for PEM as configuration values 
> that fits better with config externalization.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10338) Support PEM format for SSL certificates and private key

2021-09-10 Thread Elliot West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413243#comment-17413243
 ] 

Elliot West commented on KAFKA-10338:
-

How is PEM certificate renewal possible on the producer/consumer client?

> Support PEM format for SSL certificates and private key
> ---
>
> Key: KAFKA-10338
> URL: https://issues.apache.org/jira/browse/KAFKA-10338
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> We currently support only file-based JKS/PKCS12 format for SSL key stores and 
> trust stores. It will be good to add support for PEM as configuration values 
> that fits better with config externalization.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197
 ] 

F Méthot edited comment on KAFKA-13272 at 9/10/21, 3:17 PM:


Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

*When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?*

I saw also the consumer offset topic with data from months ago, which should 
have been compacted, very similar to 
https://issues.apache.org/jira/browse/KAFKA-13174

 

*Also, we were running exactly_once with 2 replicas, [older] documentation 
stimulate that 3 replicas is needed for exactly_once, is it still the case?*


was (Author: fmethot):
Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

I saw also the consumer offset topic with data from months ago, which should 
have been compacted, very similar to 
https://issues.apache.org/jira/browse/KAFKA-13174

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> 

[jira] [Updated] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming

2021-09-10 Thread Steve Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Zhou updated KAFKA-13290:
---
Description: 
I have a Kafka stream event processing code which aggregates 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming.

 

Following is my sample code, @Bean
 public KStream kStream(StreamsBuilder 
streamBuilder) {

  KStream aggregatedData = streamBuilder
               .stream(dataTopic, dataConsumed)
               .groupByKey(Grouped.with(
                             stringSerde,
                             aggregateValueSerde))
               
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
              .aggregate(this::initialize, this::aggregateFields,
                               materializedAsWindowStore(windowedStoreName, 
stringSerde,
                               AggregateMetricsFieldsSerde))
            
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
            .withName(windowedSuppressNodeName))
             .toStream().map((key, aggregateMetrics) ->

{            return KeyValue.pair(key.key(), aggregateMetrics);    }

);
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 

  was:
I have a stream code which aggregate 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming, even the new message has different key.

 

Following is my sample code, @Bean
 public KStream kStream(StreamsBuilder 
streamBuilder) {

  KStream aggregatedData = streamBuilder
               .stream(dataTopic, dataConsumed)
               .groupByKey(Grouped.with(
                             stringSerde,
                             aggregateValueSerde))
               
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
              .aggregate(this::initialize, this::aggregateFields,
                               materializedAsWindowStore(windowedStoreName, 
stringSerde,
                               AggregateMetricsFieldsSerde))
            
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
            .withName(windowedSuppressNodeName))
             .toStream().map((key, aggregateMetrics) ->

{            return KeyValue.pair(key.key(), aggregateMetrics);    }

);
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 


> My timeWindows last aggregated message never emit until a new message coming 
> -
>
> Key: KAFKA-13290
> URL: https://issues.apache.org/jira/browse/KAFKA-13290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
> Environment: Development
>Reporter: Steve Zhou
>Priority: Major
>
> I have a Kafka stream event processing code which aggregates 1 minutes data. 
> It works as expected if data comes continuously, 
> If we stop producer, then i found the last aggregated message does not emit 
> until new message coming.
>  
> Following is my sample code, @Bean
>  public KStream kStream(StreamsBuilder 
> streamBuilder) {
>   KStream aggregatedData = streamBuilder
>                .stream(dataTopic, dataConsumed)
>                .groupByKey(Grouped.with(
>                              stringSerde,
>                              aggregateValueSerde))
>                
> .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
>               .aggregate(this::initialize, this::aggregateFields,
>                                materializedAsWindowStore(windowedStoreName, 
> stringSerde,
>                                AggregateMetricsFieldsSerde))
>             
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
>             .withName(windowedSuppressNodeName))
>              .toStream().map((key, aggregateMetrics) ->
> {            return KeyValue.pair(key.key(), aggregateMetrics);    }
> );
>  aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
>  return aggregatedFlowData;
>  }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming

2021-09-10 Thread Steve Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Zhou updated KAFKA-13290:
---
Description: 
I have a stream code which aggregate 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming, even the new message has different key.

 

Following is my sample code, @Bean
 public KStream kStream(StreamsBuilder 
streamBuilder) {

  KStream aggregatedData = streamBuilder
               .stream(dataTopic, dataConsumed)
               .groupByKey(Grouped.with(
                             stringSerde,
                             aggregateValueSerde))
               
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
              .aggregate(this::initialize, this::aggregateFields,
                               materializedAsWindowStore(windowedStoreName, 
stringSerde,
                               AggregateMetricsFieldsSerde))
            
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
            .withName(windowedSuppressNodeName))
             .toStream().map((key, aggregateMetrics) ->

{            return KeyValue.pair(key.key(), aggregateMetrics);    }

);
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 

  was:
I have a stream code which aggregate 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming, even the new message has different key.

 

Following is my sample code, @Bean
 public KStream kStream(StreamsBuilder 
streamBuilder) {

  KStream aggregatedData = streamBuilder
              .stream(dataTopic, dataConsumed)
              .groupByKey(Grouped.with(
                            stringSerde,
                            aggregateValueSerde))
              
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
             .aggregate(this::initialize, this::aggregateFields,
                              materializedAsWindowStore(windowedStoreName, 
stringSerde,
                              AggregateMetricsFieldsSerde))
           
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
           .withName(windowedFlowSuppressNodeName))
            .toStream().map((key, aggregateMetrics) -> {
           return KeyValue.pair(key.key(), aggregateMetrics);
   });
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 


> My timeWindows last aggregated message never emit until a new message coming 
> -
>
> Key: KAFKA-13290
> URL: https://issues.apache.org/jira/browse/KAFKA-13290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
> Environment: Development
>Reporter: Steve Zhou
>Priority: Major
>
> I have a stream code which aggregate 1 minutes data. 
> It works as expected if data comes continuously, 
> If we stop producer, then i found the last aggregated message does not emit 
> until new message coming, even the new message has different key.
>  
> Following is my sample code, @Bean
>  public KStream kStream(StreamsBuilder 
> streamBuilder) {
>   KStream aggregatedData = streamBuilder
>                .stream(dataTopic, dataConsumed)
>                .groupByKey(Grouped.with(
>                              stringSerde,
>                              aggregateValueSerde))
>                
> .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
>               .aggregate(this::initialize, this::aggregateFields,
>                                materializedAsWindowStore(windowedStoreName, 
> stringSerde,
>                                AggregateMetricsFieldsSerde))
>             
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
>             .withName(windowedSuppressNodeName))
>              .toStream().map((key, aggregateMetrics) ->
> {            return KeyValue.pair(key.key(), aggregateMetrics);    }
> );
>  aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
>  return aggregatedFlowData;
>  }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming

2021-09-10 Thread Steve Zhou (Jira)
Steve Zhou created KAFKA-13290:
--

 Summary: My timeWindows last aggregated message never emit until a 
new message coming 
 Key: KAFKA-13290
 URL: https://issues.apache.org/jira/browse/KAFKA-13290
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.2
 Environment: Development
Reporter: Steve Zhou


I have a stream code which aggregate 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming, even the new message has different key.

 

Following is my sample code, @Bean
 public KStream kStream(StreamsBuilder 
streamBuilder) {

  KStream aggregatedData = streamBuilder
              .stream(dataTopic, dataConsumed)
              .groupByKey(Grouped.with(
                            stringSerde,
                            aggregateValueSerde))
              
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
             .aggregate(this::initialize, this::aggregateFields,
                              materializedAsWindowStore(windowedStoreName, 
stringSerde,
                              AggregateMetricsFieldsSerde))
           
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
           .withName(windowedFlowSuppressNodeName))
            .toStream().map((key, aggregateMetrics) -> {
           return KeyValue.pair(key.key(), aggregateMetrics);
   });
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197
 ] 

F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:57 PM:


Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

I saw also the consumer offset topic with data from months ago, which should 
have been compacted, very similar to 
https://issues.apache.org/jira/browse/KAFKA-13174


was (Author: fmethot):
Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

I saw also the consumer offset topic with data from months ago, which should 
have been compacted, very similar to

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that 

[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197
 ] 

F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:57 PM:


Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

I saw also the consumer offset topic with data from months ago, which should 
have been compacted, very similar to


was (Author: fmethot):
Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> 

[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197
 ] 

F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:56 PM:


Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart kstream app, reprocess from beginning and 
it would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?


was (Author: fmethot):
Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart service, reprocess from beginning and it 
would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, 

[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197
 ] 

F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:55 PM:


Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}
Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic. 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart service, reprocess from beginning and it 
would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?


was (Author: fmethot):
Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer

 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}

 Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic.

 

 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart service, reprocess from beginning and it 
would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the 

[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197
 ] 

F Méthot commented on KAFKA-13272:
--

Hi [~guozhang] Thanks for your feedback.

We were puzzled by this error that kept coming back on partition 9 on our 
consumer

 
{code:java}
The following partitions still have unstable offsets which are not cleared on 
the broker side: [commands-9], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log 
{code}

 Even after after we had deleted and recreated that source topic, got rid of 
Exactly_once, even deleted the __transaction topic.

 

 

The pattern was:
 * We had that issue above.
 * Clear the group in kafka, restart service, reprocess from beginning and it 
would run fine.
 * restart kafka, the issue would come back.

So my suspicions shifted to the __consumer-offset topic,

When we initially ran with exactly_once and got the issue on that partition-9, 
is it possible that the __consumer-offset kept a faulty transaction-marker as 
well, and that this faulty marker came back to haunt our consumer, even after 
we disabled exactly_once?

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
> Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
> command-expiry-store-changelog-9) while trying to send transaction markers 
> for commands-processor-0_9, these partitions are likely deleted already and 
> hence can be skipped 
> (kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
> Then
>  - we stop 

[GitHub] [kafka] chia7712 commented on a change in pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest

2021-09-10 Thread GitBox


chia7712 commented on a change in pull request #10976:
URL: https://github.com/apache/kafka/pull/10976#discussion_r706140746



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##
@@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() {
 reset(statisticsToAdd1);
 reset(statisticsToAdd2);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L);
 bytesWrittenToDatabaseSensor.record(1 + 2, 0L);
-replay(bytesWrittenToDatabaseSensor);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L);
 bytesReadFromDatabaseSensor.record(2 + 3, 0L);
-replay(bytesReadFromDatabaseSensor);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L);
 memtableBytesFlushedSensor.record(3 + 4, 0L);
-replay(memtableBytesFlushedSensor);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L);
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L);
 memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L);
-replay(memtableHitRatioSensor);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L);
 writeStallDurationSensor.record(4 + 5, 0L);
-replay(writeStallDurationSensor);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L);
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L);
+
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L);
 blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L);
-replay(blockCacheDataHitRatioSensor);
 
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L);
-
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L);
-
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L);
+
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L);
+

[GitHub] [kafka] showuon commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-09-10 Thread GitBox


showuon commented on pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#issuecomment-916861911


   @patrickstuedi , yes, the fix PR 
(https://github.com/apache/kafka/pull/11292) is under reviewing (should be 
close). Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-09-10 Thread GitBox


patrickstuedi commented on pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#issuecomment-916855214


   @showuon Any new updates on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-09-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413135#comment-17413135
 ] 

Bruno Cadonna commented on KAFKA-13128:
---

And also here:
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11250/9/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread/]

{code:java}
java.lang.AssertionError: 
Expected: is not null
 but: was null
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
 at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$15(StoreQueryIntegrationTest.java:494)
 at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:545)
 at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:488)
{code}

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.8.1
>Reporter: A. Sophie Blee-Goldman
>Assignee: Walker Carlson
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #11317: KAFKA-13287: Upgrade RocksDB to 6.22.1.1

2021-09-10 Thread GitBox


cadonna commented on pull request #11317:
URL: https://github.com/apache/kafka/pull/11317#issuecomment-916843098


   I even found the PR:
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11250/9/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11317: KAFKA-13287: Upgrade RocksDB to 6.22.1.1

2021-09-10 Thread GitBox


cadonna commented on pull request #11317:
URL: https://github.com/apache/kafka/pull/11317#issuecomment-916842108


   @ableegoldman I think I have already seen this specific failure in another 
PR. But I agree with you that we need to keep an eye on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-10 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r706092654



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -359,15 +362,24 @@ public final InternalTopologyBuilder 
setApplicationId(final String applicationId
 return this;
 }
 
-public synchronized final InternalTopologyBuilder setStreamsConfig(final 
StreamsConfig config) {
-Objects.requireNonNull(config, "config can't be null");
-this.config = config;
+public synchronized final void setTopologyProperties(final Properties 
props) {
+this.topologyProperties = props;
+}
 
-return this;
+public synchronized final void setStreamsConfig(final StreamsConfig 
config) {

Review comment:
   @guozhangwang Played around with this a bit and was able to clean things 
up nicely, plus address some other awkwardness that was bugging me. Worked out 
great (just need to do some cleanup of the tests now...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13014) KAFKA-Stream stucked when the offset is no more existing

2021-09-10 Thread Ahmed Toumi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413090#comment-17413090
 ] 

Ahmed Toumi commented on KAFKA-13014:
-

Thank you [~mjsax], I think that the guy that I contacted from confluent give a 
wrong infomation, i didn' found any bug fix bout this issue in 2.7.1

I will check if 2.7.2 fixe this issue because it exactly the same issue on 
https://issues.apache.org/jira/browse/KAFKA-12951 but with stateStore and not 
GlobalKTable.

> KAFKA-Stream stucked when the offset is no more existing
> 
>
> Key: KAFKA-13014
> URL: https://issues.apache.org/jira/browse/KAFKA-13014
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, offset manager, streams
>Affects Versions: 2.7.0
> Environment: PROD
>Reporter: Ahmed Toumi
>Priority: Major
> Attachments: image-2021-06-30-11-10-31-028.png
>
>
> We have kafka-stream with multiple instances and threads.
> This kafka-stream consume from a lot of topics.
> One of the topic partitions wasn't accessible for a day and the retention of 
> the topic is 4 Hours.
> After fixing the problem, the kafka-stream is trying to consume from an 
> offset that does ot exist anymore:
>  * Kafka-consumer-group describe:
> !image-2021-06-30-11-10-31-028.png!
> We can see that the current offset that the KS is waiting for is *59754934* 
> but the new first offset of this partition is *264896001*.
> The problem that the Kafka-stream does not throw any exception
> that's the only log what i'm seeing 
>  
> {code:java}
> 08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Updating assignment with08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Updating assignment with Assigned 
> partitions:                       [adm__article_ean_repartition_v3-10, 
> adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, 
> adm__article_stock_repartition_v3-10] Current owned partitions:               
>    [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, 
> adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] 
> Added partitions (assigned - owned):       [] Revoked partitions (owned - 
> assigned):     [] 08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Notifying assignor about the new 
> Assignment(partitions=[adm__article_stock_repartition_v3-10, 
> adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, 
> adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer]
>  No followup rebalance was requested, resetting the rebalance 
> schedule.08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.s.p.internals.TaskManager - stream-thread 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> Handle new assignment with: New active tasks: [0_10] New standby tasks: 
> [0_17, 0_21] Existing active tasks: [0_10] Existing standby tasks: [0_17, 
> 0_21]08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Adding newly assigned partitions: 
> {code}
>  
> PI: version broker kafka : 5.3.4-ccs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wycccccc commented on pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest

2021-09-10 Thread GitBox


wycc commented on pull request #10976:
URL: https://github.com/apache/kafka/pull/10976#issuecomment-916776585


   @ijuma I think I survived this round of reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest

2021-09-10 Thread GitBox


wycc commented on pull request #10976:
URL: https://github.com/apache/kafka/pull/10976#issuecomment-916775979


   @chia7712 I think I have solved most of the problems, except merger  
`setUpMetricsStubMock`.
   In order to avoid existing static mock registration problem caused by 
repeatedly calling `setUpMetricsStubMock` method.I put `mockStatic` in 
`@Before`.So I think It's not bad at the moment about `tearDown()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-10 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r706014148



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -359,15 +362,24 @@ public final InternalTopologyBuilder 
setApplicationId(final String applicationId
 return this;
 }
 
-public synchronized final InternalTopologyBuilder setStreamsConfig(final 
StreamsConfig config) {
-Objects.requireNonNull(config, "config can't be null");
-this.config = config;
+public synchronized final void setTopologyProperties(final Properties 
props) {
+this.topologyProperties = props;
+}
 
-return this;
+public synchronized final void setStreamsConfig(final StreamsConfig 
config) {

Review comment:
   That all makes sense to me -- initially I was trying to make this just 
work with the existing public API, but ultimately figured that even the concept 
of "topology-level overrides" would require a KIP. If we're only dealing with 
named topologies then we're free to do things the right way from the start -- 
thanks for the suggestions :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-10 Thread GitBox


ableegoldman commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r706010750



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   >  it's failing for test cases like 
shouldNotThrowConcurrentModificationException . This seems to be because the 
put() call while iterating
   
   You don't mean that the test itself is failing, just that the filter isn't 
being applied to the `put` records, right?
   > ...the put() call while iterating is appending to the wrapped instance of 
iterator and hence it's not visible
   
   Can you also expand a bit on what you mean by this? Not sure I have the 
whole picture here since I haven't been following this too closely, thanks in 
advance for catching me up   
   
   > Looking at this, do you think it would be a good idea to move this logic 
in the actual RocksDB implementations? Or do you think there's a better way to 
do it here in MeteredStore class itself?
   
   Personally I think the benefits of keeping this in the metering layer 
outweigh any downsides, especially if it comes down to just some weird edge 
case(s) that don't play nicely with the filtering. But I'm also interested in 
hearing what @showuon makes of this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-09-10 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12774:

Fix Version/s: (was: 2.8.1)

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
> Fix For: 3.1.0, 3.0.1
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12622) Automate LICENSE file validation

2021-09-10 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12622:

Fix Version/s: (was: 2.8.1)
   3.1.0

> Automate LICENSE file validation
> 
>
> Key: KAFKA-12622
> URL: https://issues.apache.org/jira/browse/KAFKA-12622
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.1.0, 3.0.1
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed 
> a correct license file for 2.8.0. This file will certainly become wrong again 
> in later releases, so we need to write some kind of script to automate a 
> check.
> It crossed my mind to automate the generation of the file, but it seems to be 
> an intractable problem, considering that each dependency may change licenses, 
> may package license files, link to them from their poms, link to them from 
> their repos, etc. I've also found multiple URLs listed with various 
> delimiters, broken links that I have to chase down, etc.
> Therefore, it seems like the solution to aim for is simply: list all the jars 
> that we package, and print out a report of each jar that's extra or missing 
> vs. the ones in our `LICENSE-binary` file.
> The check should be part of the release script at least, if not part of the 
> regular build (so we keep it up to date as dependencies change).
>  
> Here's how I do this manually right now:
> {code:java}
> // build the binary artifacts
> $ ./gradlewAll releaseTarGz
> // unpack the binary artifact 
> $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz
> $ cd xf kafka_2.13-X.Y.Z
> // list the packaged jars 
> // (you can ignore the jars for our own modules, like kafka, kafka-clients, 
> etc.)
> $ ls libs/
> // cross check the jars with the packaged LICENSE
> // make sure all dependencies are listed with the right versions
> $ cat LICENSE
> // also double check all the mentioned license files are present
> $ ls licenses {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology

2021-09-10 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13164:

Fix Version/s: (was: 2.8.1)

> State store is attached to wrong node in the Kafka Streams topology
> ---
>
> Key: KAFKA-13164
> URL: https://issues.apache.org/jira/browse/KAFKA-13164
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: local development (MacOS Big Sur 11.4)
>Reporter: Ralph Matthias Debusmann
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.0.1
>
> Attachments: 1.jpg, 3.jpg
>
>
> Hi,
> mjsax and me noticed a bug where a state store is attached to the wrong node 
> in the Kafka Streams topology.
> The issue arised when I tried to read a topic into a KTable, then continued 
> with a mapValues(), and then joined this KTable with a KStream, like so:
>  
> var kTable = this.streamsBuilder.table().mapValues( function>);
>  
> and then later:
>  
> var joinedKStream = kstream.leftJoin(kTable, );
>  
> The join didn't work, and neither did it work when I added Materialized.as() 
> to mapValues(), like so:
> var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*);
>  
>  Interestingly, I could get the join to work, when I first read the topic 
> into a *KStream*, then continued with the mapValues(), then turned the 
> KStream into a KTable, and then joined the KTable with the other KStream, 
> like so:
>  
> var kTable = this.streamsBuilder.stream().mapValues( function>).toTable();
>  
> (the join worked the same as above)
>  
> When mjsax and me had a look on the topology, we could see that in the 
> former, not working code, the state store (required for the join) is attached 
> to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node 
> (see attachment "1.jpg"). In the working code, the state store is (correctly) 
> attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
>  
> Best regards,
> xdgrulez
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException

2021-09-10 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12644:

Fix Version/s: (was: 2.8.1)

> Add Missing Class-Level Javadoc to Descendants of 
> org.apache.kafka.common.errors.ApiException
> -
>
> Key: KAFKA-12644
> URL: https://issues.apache.org/jira/browse/KAFKA-12644
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, documentation
>Affects Versions: 3.0.0, 2.8.1
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>  Labels: documentation
> Fix For: 3.1.0, 3.0.1
>
>
> I noticed that class-level Javadocs are missing from some classes in the 
> org.apache.kafka.common.errors package. This issue is for tracking the work 
> of adding the missing class-level javadocs for those Exception classes.
> https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html
> https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors
> Basic class-level documentation could be derived by mapping the error 
> conditions documented in the protocol
> https://kafka.apache.org/protocol#protocol_constants



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13256) Possible NPE in ConfigDef when rendering (enriched) RST or HTML when documentation is not set/NULL

2021-09-10 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413010#comment-17413010
 ] 

Mickael Maison commented on KAFKA-13256:


While this does not affect Apache Kafka itself, ConfigDef is public API and 
used by Connect connectors and third party code like Debezium. In a perfect 
world, you can argue the documentation should always be set but as it was a 
small fix, it made sense to merge it.

> Possible NPE in ConfigDef when rendering (enriched) RST or HTML when 
> documentation is not set/NULL
> --
>
> Key: KAFKA-13256
> URL: https://issues.apache.org/jira/browse/KAFKA-13256
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0, 2.8.0
>Reporter: René Kerner
>Assignee: René Kerner
>Priority: Major
> Fix For: 3.1.0
>
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> While working on Debezium I discovered the following issue:
> When Kafka's ConfigDef renders the HTML or RST documentation representation 
> of the config definition, it requires `ConfigKey.documentation` member 
> variable to be a java.lang.String instance that's set to an actual value 
> different than NULL, else NPE happens:
> {code:java}
>  b.append(key.documentation.replaceAll("\n", ""));
> {code}
> {code:java}
>  for (String docLine : key.documentation.split("\n")) {
> {code}
>  
> When `documentation` is not set/NULL I suggest to either set a valid String 
> like "No documentation available" or skip that config key.
>  
> I could provide a PR to fix this soon.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2021-09-10 Thread Matthew Sheppard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sheppard updated KAFKA-13289:
-
Description: 
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

{noformat}
WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.
{noformat}

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is available at 
https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

The actual kafka-streams topology in there looks like this.


{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final KStream leftStream = 
builder.stream(leftTopic);
final KStream rightStream = 
builder.stream(rightTopic);

final KStream rekeyedLeftStream = leftStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

final KStream rekeyedRightStream = rightStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

final KStream joined = rekeyedLeftStream.leftJoin(
rekeyedRightStream,
(left, right) -> left + "/" + right,
joinWindow
);
{code}


...and the eventual output I produce looks like this...

{code}
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
{code}

...where as, given the input data, I expect to see every row end with the two 
values joined, rather than the right value being null.

Note that I understand it's expected that we initially get the left/null values 
for many values since that's the expected semantics of kafka-streams left join, 
at least until 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious

I've noticed that if I set a very large grace value on the join window the 
problem is solved, but since the input I provide is not out of order I did not 
expect to need to do that, and I'm weary of the resource requirements doing so 
in practice on an application with a lot of volume.

My suspicion is that something is happening such that when one partition is 
processed it causes the stream time to be pushed forward to the newest message 
in that partition, meaning when the next partition is then examined it is found 
to contain many records which are 'too old' compared to the stream time. 

I ran across this discussion thread which seems to cover the same issue 
http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e
 and had a request from [~cadonna] for a reproduction case, so I'm hoping my 
example above might make the issue easier to tackle!

  was:
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

{noformat}
WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.
{noformat}

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams 

[GitHub] [kafka] chia7712 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

2021-09-10 Thread GitBox


chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r705913852



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -977,6 +977,11 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 return streamThread;
 }
 
+// Ensure Mockito stub construct with capture argument for 
KafkaStreamsTest.
+public static Metrics createThisMetrics(final MetricConfig metricConfig, 
final List reporters, final Time time, final MetricsContext 
metricsContext) {

Review comment:
   Why need the word "this"?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
 public boolean isRunning() {
 synchronized (stateLock) {
-return state.isAlive();
+return isStateAlive();
 }
 }
 
+// Ensure Mockito can stub method for KafkaStreamTest.
+public boolean isStateAlive() {

Review comment:
   Personally, it is an anti-pattern. There are two Public methods 
returning same "value" but one of them is holding lock. It is hard to use them 
correctly. At any rate, this is unrelated to this PR.

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -903,20 +897,15 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
 streams.start();
 }
 
-PowerMock.verify(Executors.class);

Review comment:
   please add `verify` to make sure `rocksDBMetricsRecordingTriggerThread` 
is used to create thread in this test.

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread 
thread,
 "",
 Collections.emptySet(),
 Collections.emptySet()
-)
-).anyTimes();
-
EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class),
 anyLong())).andStubReturn(true);
-EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-thread.resizeCache(EasyMock.anyLong());
-EasyMock.expectLastCall().anyTimes();
-thread.requestLeaveGroupDuringShutdown();
-EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + 
threadId);
-thread.shutdown();
-EasyMock.expectLastCall().andAnswer(() -> {
+));
+when(thread.waitOnThreadState(isA(StreamThread.State.class), 
anyLong())).thenReturn(true);
+when(thread.isStateAlive()).thenReturn(true);
+verify(thread, atMostOnce()).isStateAlive();

Review comment:
   It seems to me this check is no-op since the thread is not ready to be 
used. Hence, the execution times is always 0.

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread 
thread,
 "",
 Collections.emptySet(),
 Collections.emptySet()
-)
-).anyTimes();
-
EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class),
 anyLong())).andStubReturn(true);
-EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-thread.resizeCache(EasyMock.anyLong());
-EasyMock.expectLastCall().anyTimes();
-thread.requestLeaveGroupDuringShutdown();
-EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + 
threadId);
-thread.shutdown();
-EasyMock.expectLastCall().andAnswer(() -> {
+));
+when(thread.waitOnThreadState(isA(StreamThread.State.class), 
anyLong())).thenReturn(true);
+when(thread.isStateAlive()).thenReturn(true);
+verify(thread, atMostOnce()).isStateAlive();
+when(thread.getName()).thenReturn("processId-StreamThread-" + 
threadId);
+
+doAnswer(invocation -> {
 supplier.consumer.close();
 supplier.restoreConsumer.close();
 for (final MockProducer producer : 
supplier.producers) {
 producer.close();
 }
 state.set(StreamThread.State.DEAD);
 
-threadStatelistenerCapture.getValue().onChange(thread, 
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
-threadStatelistenerCapture.getValue().onChange(thread, 
StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
+threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
+threadStateListenerCapture.getValue().onChange(thread, 

[GitHub] [kafka] dajac merged pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1

2021-09-10 Thread GitBox


dajac merged pull request #11318:
URL: https://github.com/apache/kafka/pull/11318


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1

2021-09-10 Thread GitBox


dajac commented on pull request #11318:
URL: https://github.com/apache/kafka/pull/11318#issuecomment-916669933


   @omkreddy Yes, we should also fix trunk. I will do a separate one for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2021-09-10 Thread Matthew Sheppard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sheppard updated KAFKA-13289:
-
Summary: Bulk processing correctly ordered input data through a join with 
kafka-streams results in `Skipping record for expired segment`  (was: Bulk 
processing data through a join with kafka-streams results in `Skipping record 
for expired segment`)

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Major
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across 
> https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
>  from a year and a half ago which seems to describe the same problem, but I'm 
> hoping the self-contained reproduction might make the issue easier to 

[jira] [Updated] (KAFKA-13289) Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`

2021-09-10 Thread Matthew Sheppard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sheppard updated KAFKA-13289:
-
Description: 
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

{noformat}
WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.
{noformat}

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is available at 
https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

The actual kafka-streams topology in there looks like this.


{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final KStream leftStream = 
builder.stream(leftTopic);
final KStream rightStream = 
builder.stream(rightTopic);

final KStream rekeyedLeftStream = leftStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

final KStream rekeyedRightStream = rightStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

final KStream joined = rekeyedLeftStream.leftJoin(
rekeyedRightStream,
(left, right) -> left + "/" + right,
joinWindow
);
{code}


...and the eventual output I produce looks like this...

{code}
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
{code}

...where as, given the input data, I expect to see every row end with the two 
values joined, rather than the right value being null.

Note that I understand it's expected that we initially get the left/null values 
for many values since that's the expected semantics of kafka-streams left join, 
at least until 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious

I've noticed that if I set a very large grace value on the join window the 
problem is solved, but since the input I provide is not out of order I did not 
expect to need to do that, and I'm weary of the resource requirements doing so 
in practice on an application with a lot of volume.

My suspicion is that something is happening such that when one partition is 
processed it causes the stream time to be pushed forward to the newest message 
in that partition, meaning when the next partition is then examined it is found 
to contain many records which are 'too old' compared to the stream time. 

I ran across 
https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
 from a year and a half ago which seems to describe the same problem, but I'm 
hoping the self-contained reproduction might make the issue easier to tackle!

  was:
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.`

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that 

[jira] [Updated] (KAFKA-13289) Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`

2021-09-10 Thread Matthew Sheppard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sheppard updated KAFKA-13289:
-
Description: 
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.`

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is available at 
https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

The actual kafka-streams topology in there looks like this.


{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final KStream leftStream = 
builder.stream(leftTopic);
final KStream rightStream = 
builder.stream(rightTopic);

final KStream rekeyedLeftStream = leftStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

final KStream rekeyedRightStream = rightStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

final KStream joined = rekeyedLeftStream.leftJoin(
rekeyedRightStream,
(left, right) -> left + "/" + right,
joinWindow
);
{code}


...and the eventual output I produce looks like this...

{code}
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
{code}

...where as, given the input data, I expect to see every row end with the two 
values joined, rather than the right value being null.

Note that I understand it's expected that we initially get the left/null values 
for many values since that's the expected semantics of kafka-streams left join, 
at least until 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious

I've noticed that if I set a very large grace value on the join window the 
problem is solved, but since the input I provide is not out of order I did not 
expect to need to do that, and I'm weary of the resource requirements doing so 
in practice on an application with a lot of volume.

My suspicion is that something is happening such that when one partition is 
processed it causes the stream time to be pushed forward to the newest message 
in that partition, meaning when the next partition is then examined it is found 
to contain many records which are 'too old' compared to the stream time. 

I ran across 
https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
 from a year and a half ago which seems to describe the same problem, but I'm 
hoping the self-contained reproduction might make the issue easier to tackle!

  was:
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.`

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is 

[jira] [Created] (KAFKA-13289) Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`

2021-09-10 Thread Matthew Sheppard (Jira)
Matthew Sheppard created KAFKA-13289:


 Summary: Bulk processing data through a join with kafka-streams 
results in `Skipping record for expired segment`
 Key: KAFKA-13289
 URL: https://issues.apache.org/jira/browse/KAFKA-13289
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Matthew Sheppard


When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.`

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is available at 
https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

The actual kafka-streams topology in there looks like this.

```
final StreamsBuilder builder = new StreamsBuilder();
final KStream leftStream = 
builder.stream(leftTopic);
final KStream rightStream = 
builder.stream(rightTopic);

final KStream rekeyedLeftStream = leftStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

final KStream rekeyedRightStream = rightStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

final KStream joined = rekeyedLeftStream.leftJoin(
rekeyedRightStream,
(left, right) -> left + "/" + right,
joinWindow
);
```

...and the eventual output I produce looks like this...

```
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
```

...where as, given the input data, I expect to see every row end with the two 
values joined, rather than the right value being null.

Note that I understand it's expected that we initially get the left/null values 
for many values since that's the expected semantics of kafka-streams left join, 
at least until 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious

I've noticed that if I set a very large grace value on the join window the 
problem is solved, but since the input I provide is not out of order I did not 
expect to need to do that, and I'm weary of the resource requirements doing so 
in practice on an application with a lot of volume.

My suspicion is that something is happening such that when one partition is 
processed it causes the stream time to be pushed forward to the newest message 
in that partition, meaning when the next partition is then examined it is found 
to contain many records which are 'too old' compared to the stream time. 

I ran across 
https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
 from a year and a half ago which seems to describe the same problem, but I'm 
hoping the self-contained reproduction might make the issue easier to tackle!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on a change in pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1

2021-09-10 Thread GitBox


omkreddy commented on a change in pull request #11318:
URL: https://github.com/apache/kafka/pull/11318#discussion_r705926622



##
File path: docs/upgrade.html
##
@@ -39,7 +39,7 @@ Notable changes in 2
 
 
 
-Upgrading to 2.7.0 from any 
version 0.8.x through 2.6.x
+Upgrading to 2.8.1 from any 
version 0.8.x through 2.7.x

Review comment:
   looks like we missed adding `Upgrading to 2.8.0` section earlier. I 
think, we should add new section for `2.8.0`  release and retain existing 
"Upgrading to 2.7.0" section.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1

2021-09-10 Thread GitBox


omkreddy commented on a change in pull request #11318:
URL: https://github.com/apache/kafka/pull/11318#discussion_r705926622



##
File path: docs/upgrade.html
##
@@ -39,7 +39,7 @@ Notable changes in 2
 
 
 
-Upgrading to 2.7.0 from any 
version 0.8.x through 2.6.x
+Upgrading to 2.8.1 from any 
version 0.8.x through 2.7.x

Review comment:
   looks like we missed adding `Upgrading to 2.8.0` section earlier. I 
think, we should add new section from "2.8.0"  release and retain existing 
"Upgrading to 2.7.0" section.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1

2021-09-10 Thread GitBox


dajac opened a new pull request #11318:
URL: https://github.com/apache/kafka/pull/11318


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org