[GitHub] [kafka] cmccabe merged pull request #11313: MINOR: GroupMetadataManager#shutdown should remove metrics
cmccabe merged pull request #11313: URL: https://github.com/apache/kafka/pull/11313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
cmccabe commented on a change in pull request #11312: URL: https://github.com/apache/kafka/pull/11312#discussion_r706509007 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1385,10 +1385,22 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { +val output = new util.HashMap[Any, Any](input) +val brokerId = output.get(KafkaConfig.BrokerIdProp) +val nodeId = output.get(KafkaConfig.NodeIdProp) +if (brokerId == null && nodeId != null) { + output.put(KafkaConfig.BrokerIdProp, nodeId) +} else if (brokerId != null && nodeId == null) { + output.put(KafkaConfig.NodeIdProp, brokerId) +} +output + } } class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]) - extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging { + extends AbstractConfig(KafkaConfig.configDef, KafkaConfig.populateSynonyms(props), doLog) with Logging { Review comment: Yes, good find. This is a case where Scala's behavior is kind of annoying. I wish there was a way to opt-out of the auto-initialization. Anyway, I made the primary constructor private, and put a call to `KafkaConfig#populateSynonyms` in all the (publicly visible) secondary constructors. This should fix it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
cmccabe commented on a change in pull request #11312: URL: https://github.com/apache/kafka/pull/11312#discussion_r706507377 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1385,10 +1385,22 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { Review comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
cmccabe commented on a change in pull request #11312: URL: https://github.com/apache/kafka/pull/11312#discussion_r706506964 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1385,10 +1385,22 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { +val output = new util.HashMap[Any, Any](input) Review comment: AbstractConfig already copies the map which is passed to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election
[ https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413424#comment-17413424 ] Jose Armando Garcia Sancio commented on KAFKA-7408: --- [~hachikuji] can you clarify what you mean by "What we can do is let the newly elected leader truncate to the LSO and then rewrite all the markers that followed it using its own leader epoch (to avoid divergence from followers)"? # Does that mean that the leader will rewrite all {{WriteTxnMarker}} from the {{LSO}} to the {{LEO}}? If so does it rewrite them using the original {{ABORT}} or {{COMMIT}} marker? # Does it mean that the unclean leader will {{ABORT}} all pending transactions? > Truncate to LSO on unclean leader election > -- > > Key: KAFKA-7408 > URL: https://issues.apache.org/jira/browse/KAFKA-7408 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > If an unclean leader is elected, we may lose committed transaction data. That > alone is expected, but what is worse is that a transaction which was > previously completed (either committed or aborted) may lose its marker and > become dangling. The transaction coordinator will not know about the unclean > leader election, so will not know to resend the transaction markers. > Consumers with read_committed isolation will be stuck because the LSO cannot > advance. > To keep this scenario from occurring, it would be better to have the unclean > leader truncate to the LSO so that there are no dangling transactions. > Truncating to the LSO is not alone sufficient because the markers which > allowed the LSO advancement may be at higher offsets. What we can do is let > the newly elected leader truncate to the LSO and then rewrite all the markers > that followed it using its own leader epoch (to avoid divergence from > followers). > The interesting cases when an unclean leader election occurs are are when a > transaction is ongoing. > 1. If a producer is in the middle of a transaction commit, then the > coordinator may still attempt to write transaction markers. This will either > succeed or fail depending on the producer epoch in the unclean leader. If the > epoch matches, then the WriteTxnMarker call will succeed, which will simply > be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker > call will fail and the transaction coordinator can potentially remove the > partition from the transaction. > 2. If a producer is still writing the transaction, then what happens depends > on the producer state in the unclean leader. If no producer state has been > lost, then the transaction can continue without impact. Otherwise, the > producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will > cause the transaction to be aborted by the coordinator. That takes us back to > the first case. > By truncating the LSO, we ensure that transactions are either preserved in > whole or they are removed from the log in whole. For an unclean leader > election, that's probably as good as we can do. But we are ensured that > consumers will not be blocked by dangling transactions. The only remaining > situation where a dangling transaction might be left is if one of the > transaction state partitions has an unclean leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe opened a new pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe opened a new pull request #11320: URL: https://github.com/apache/kafka/pull/11320 The ReplicaManager, LogManager, and KafkaApis class all have many constructor parameters. It is often difficult to add or remove a parameter, since there are so many locations that need to be updated. In order to address this problem, we should use named parameters when constructing these objects from Scala code. This will make it easy to add new optional parameters without modifying many test cases. It will also make it easier to read git diffs and PRs, since the parameters will have names next to them. Since Java does not support named paramters, this PR adds several Builder classes which can be used to achieve the same effect. ReplicaManager also had a secondary constructor, which this PR removes. The function of the secondary constructor was just to provide some default parameters for the main constructor. However, it is simpler to just use default parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
hachikuji commented on a change in pull request #11312: URL: https://github.com/apache/kafka/pull/11312#discussion_r706502119 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1385,10 +1385,22 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { +val output = new util.HashMap[Any, Any](input) +val brokerId = output.get(KafkaConfig.BrokerIdProp) +val nodeId = output.get(KafkaConfig.NodeIdProp) +if (brokerId == null && nodeId != null) { + output.put(KafkaConfig.BrokerIdProp, nodeId) +} else if (brokerId != null && nodeId == null) { + output.put(KafkaConfig.NodeIdProp, brokerId) +} +output + } } class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]) - extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging { + extends AbstractConfig(KafkaConfig.configDef, KafkaConfig.populateSynonyms(props), doLog) with Logging { Review comment: It seems a little bit odd that we populate synonyms only in the reference that we're passing to `AbstractConfig`. The field `KafkaConfig.props` could still be accessed directly (maybe it should be private?). Would it make sense to move this to a factory method? For example: ```scala object KafkaConfig { def apply(props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]): KafkaConfig = { new KafkaConfig(populateSynonyms(props), doLog, dynamicConfigOverride) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8116) Add Kafka Streams archetype for Java11
[ https://issues.apache.org/jira/browse/KAFKA-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8116: --- Fix Version/s: 4.0.0 > Add Kafka Streams archetype for Java11 > -- > > Key: KAFKA-8116 > URL: https://issues.apache.org/jira/browse/KAFKA-8116 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > In https://issues.apache.org/jira/browse/KAFKA-5727 we added an archetype for > Kafka Streams. However, this archetype only works for Java8 but not for > Java11. Thus, we should add a new archetype project for Java11. > This ticket requires a KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413408#comment-17413408 ] Matthias J. Sax commented on KAFKA-13289: - I guess that the repartition topic could be the "issue", because there is no guarantee about order there. Setting a new key upstream means, that you get interleaved writes into the repartition topic from different upstream producers (as records from two different input topic partition can map to the same join key). In a life run when throughput is lower, the issue might be mitigate. But if you reprocess old data, more unorder might occur in the repartition topic and thus you would need a larger grace period downstream. > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Major > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records
[jira] [Commented] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming
[ https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413405#comment-17413405 ] Matthias J. Sax commented on KAFKA-13290: - Your observation is behavior by-design. You are using `suppress` with "until-window-close" strategy. A window closes only if stream-time advanced beyond window-end plus grace-period. Note that stream-time is data driven, ie, if no new data arrives, stream-time cannot advance (it's independent for wall-clock time). Closing this as "not a problem". There is a ticket that request a "wall-clock time" emit strategy for suppress() already https://issues.apache.org/jira/browse/KAFKA-7748 in case you want to follow up there. – Of course, feel free to follow up on this ticket if you have further questions. > My timeWindows last aggregated message never emit until a new message coming > - > > Key: KAFKA-13290 > URL: https://issues.apache.org/jira/browse/KAFKA-13290 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 > Environment: Development >Reporter: Steve Zhou >Priority: Major > > I have a Kafka stream event processing code which aggregates 1 minutes data. > It works as expected if data comes continuously, > If we stop producer, then i found the last aggregated message does not emit > until new message coming. > > Following is my sample code, @Bean > public KStream kStream(StreamsBuilder > streamBuilder) { > KStream aggregatedData = streamBuilder > .stream(dataTopic, dataConsumed) > .groupByKey(Grouped.with( > stringSerde, > aggregateValueSerde)) > > .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) > .aggregate(this::initialize, this::aggregateFields, > materializedAsWindowStore(windowedStoreName, > stringSerde, > AggregateMetricsFieldsSerde)) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) > .withName(windowedSuppressNodeName)) > .toStream().map((key, aggregateMetrics) -> > { return KeyValue.pair(key.key(), aggregateMetrics); } > ); > aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); > return aggregatedFlowData; > } > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming
[ https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13290. - Resolution: Not A Problem > My timeWindows last aggregated message never emit until a new message coming > - > > Key: KAFKA-13290 > URL: https://issues.apache.org/jira/browse/KAFKA-13290 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 > Environment: Development >Reporter: Steve Zhou >Priority: Major > > I have a Kafka stream event processing code which aggregates 1 minutes data. > It works as expected if data comes continuously, > If we stop producer, then i found the last aggregated message does not emit > until new message coming. > > Following is my sample code, @Bean > public KStream kStream(StreamsBuilder > streamBuilder) { > KStream aggregatedData = streamBuilder > .stream(dataTopic, dataConsumed) > .groupByKey(Grouped.with( > stringSerde, > aggregateValueSerde)) > > .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) > .aggregate(this::initialize, this::aggregateFields, > materializedAsWindowStore(windowedStoreName, > stringSerde, > AggregateMetricsFieldsSerde)) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) > .withName(windowedSuppressNodeName)) > .toStream().map((key, aggregateMetrics) -> > { return KeyValue.pair(key.key(), aggregateMetrics); } > ); > aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); > return aggregatedFlowData; > } > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics
[ https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13288. - Resolution: Fixed > Transaction find-hanging command with --broker-id excludes internal topics > -- > > Key: KAFKA-13288 > URL: https://issues.apache.org/jira/browse/KAFKA-13288 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is > specified. By default, this excludes internal topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #11319: KAFKA-13288; Include internal topics when searching hanging transactions
hachikuji merged pull request #11319: URL: https://github.com/apache/kafka/pull/11319 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10338) Support PEM format for SSL certificates and private key
[ https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413395#comment-17413395 ] Rajini Sivaram edited comment on KAFKA-10338 at 9/10/21, 9:27 PM: -- [~teabot] We currently don't have a way of reconfiguring PEM configs for clients unless they are stored externally in a file and the file is reloaded. It may be possible to add a custom `ssl.engine.factory.class` that does reconfiguration for clients. For brokers, we can use standard dynamic broker configs for PEM. was (Author: rsivaram): [~teabot] We currently don't have a way of updating PEM configs for clients unless they are stored externally in a file and the file is reloaded. It may be possible to add a custom `ssl.engine.factory.class` that does reconfiguration for clients. For brokers, we can use standard dynamic broker configs for PEM. > Support PEM format for SSL certificates and private key > --- > > Key: KAFKA-10338 > URL: https://issues.apache.org/jira/browse/KAFKA-10338 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > We currently support only file-based JKS/PKCS12 format for SSL key stores and > trust stores. It will be good to add support for PEM as configuration values > that fits better with config externalization. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10338) Support PEM format for SSL certificates and private key
[ https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413395#comment-17413395 ] Rajini Sivaram commented on KAFKA-10338: [~teabot] We currently don't have a way of updating PEM configs for clients unless they are stored externally in a file and the file is reloaded. It may be possible to add a custom `ssl.engine.factory.class` that does reconfiguration for clients. For brokers, we can use standard dynamic broker configs for PEM. > Support PEM format for SSL certificates and private key > --- > > Key: KAFKA-10338 > URL: https://issues.apache.org/jira/browse/KAFKA-10338 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > We currently support only file-based JKS/PKCS12 format for SSL key stores and > trust stores. It will be good to add support for PEM as configuration values > that fits better with config externalization. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r706478220 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } + if(cleanableLogs.isEmpty) { -None +val logsWithTombstonesExpired = dirtyLogs.filter { + case ltc => +// in this case, we are probably in a low throughput situation +// therefore, we should take advantage of this fact and remove tombstones if we can +// under the condition that the log's latest delete horizon is less than the current time +// tracked +ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() Review comment: @junrao @hachikuji @lbradstreet I've removed the logic for tracking the latestDeleteHorizon and the deleteHorizon-triggered cleaning in grabFilthiestCompactedLog since this part of the PR is not a part of KIP-534 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
mumrah commented on a change in pull request #11312: URL: https://github.com/apache/kafka/pull/11312#discussion_r706419226 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1385,10 +1385,22 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { +val output = new util.HashMap[Any, Any](input) Review comment: Is it safe to make a new map here? Are there any callers of KafkaConfig() which might expect to modify the props map after the object has been constructed? ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1385,10 +1385,22 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { Review comment: A comment or docstring would be useful here explaining the motivation for this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413335#comment-17413335 ] Guozhang Wang commented on KAFKA-13272: --- I looked into the source code and I have a suspicion it is a broker side bug. Here's my theory: * In the transaction coordinator, when we want to send markers to the data partition hosts, there's a condition that if the leader of that data partition is unknown, then we would skip sending this marker since the data partition is likely deleted (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala#L372). * When one coordinator is down, other brokers would take over as the coordinator and usually have the up-to-date metadata, hence would still proceed to send the markers. * However when the whole cluster is down ungracefully, upon restarting the newly elected coordinators are less likely to have the up-to-date metadata, while reading the txn logs and trying to complete the transactions. If it hits the above condition, it would skip sending the markers and note that they would never be retried. * So from the newly elected coordinator's point of view, the txn has been completed while they are not actually. If this is indeed the bug, it would be a bit tricky to fix. Maybe one way is to only start loading the txn markers after metadata is refreshed upon starting? With the KRaft, this should not happen in the new world since the brokers can easily tell whether their metadata is stale or not. [~hachikuji] WDYT? > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO
[GitHub] [kafka] cmccabe commented on a change in pull request #11313: MINOR: GroupMetadataManager#shutdown should remove metrics
cmccabe commented on a change in pull request #11313: URL: https://github.com/apache/kafka/pull/11313#discussion_r706387699 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -977,6 +977,9 @@ class GroupMetadataManager(brokerId: Int, shuttingDown.set(true) if (scheduler.isStarted) scheduler.shutdown() +metrics.removeSensor(GroupMetadataManager.LoadTimeSensor) +metrics.removeSensor("OffsetCommits") Review comment: good idea. will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #11319: KAFKA-13288; Include internal topics when searching hanging transactions
hachikuji opened a new pull request #11319: URL: https://github.com/apache/kafka/pull/11319 This patch ensures that internal topics are included when searching for hanging transactions with the `--broker-id` argument in `kafka-transactions.sh`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
guozhangwang commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-917118246 Hi @vamossagar12 Thanks again for your patience! I know it has been dragging a bit long, and I had some discussions with @cadonna and @vamossagar12 regarding how we can move forward to eventually merge-in your work to Apache Kafka. As a result of that I created this ticket trying to summarize what we want to do (which would incorporate this JIRA ticket): https://issues.apache.org/jira/browse/KAFKA-13286 At the moment, our main concerns are that: 1) Direct bytebuffers, which is relied on native memory, is not fully managed by the JVM (GC) and hence we need to be very careful about its side-effects regarding memory pressures. 2) The current implementation without the API changes on serdes has a relatively small improvement on throughput, whereas in order to use the byte-buffer without API changes, we also would need to "guess" the allocated size before serialization. So we think it's probably better to consider tackling the larger scope of https://issues.apache.org/jira/browse/KAFKA-13286 as a whole, which would mean we would hold on merging this PR after we have the serde / byte-buffer supplier works in place. Does that sound reasonable to you? Would love to hear your thoughts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13286) Revisit Streams State Store and Serde Implementation
[ https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13286: -- Description: Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design: * The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially when working with windowed state stores that rely on composite keys. In many places in the code we have extract parts of the composite key to deserialize the either the timestamp or the message key from the state store key (e.g. the methods in WindowStoreUtils). * The serde operation could happen on multiple layers of the state store hierarchies, which means we need to extra byte array copies as we move along doing serdes. For example, we do serde in the metered layer, but then again in cached layer with cache functions, and also in logged stores for generated the key/value in bytes to send to Kafka. To improve on this, we can consider having support for serde into/from ByteBuffers would allow us to reuse the underlying bytearrays and just pass around slices of the underlying Buffers to avoid the unnecessary copying. 1) More specifically, e.g. the serialize interface could be refactored to: {code} ByteBuffer serialize(String topic, T data, ByteBuffer); {code} Where the serialized bytes would be appended to the ByteBuffer. When a series of serialize functions are called along side the state store hierarchies, we then just need to make sure that what's should be appended first to the ByteBuffer would be serialized first. E.g. if the serialized bytes format of a WindowSchema is Then we would need to call the serialize as in: {code} serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); {code} 2) In addition, we can consider having a pool of ByteBuffers representing a set of byte arrays that can be re-used. This can be captured as an intelligent {{ByteBufferSupplier}}, which provides: {code} ByteBuffer ByteBufferSupplier#allocate(long size) {code} Its implementation can choose to either create new byte arrays, or re-use existing ones in the pool; the gottcha though is that we may usually not know the serialized byte length for raw keys (think: in practice the keys would be in json/avro etc), and hence would not know how to pass in {{size}} for serialization, and hence may need to be conservative, or trial and error etc. Of course callers then would be responsible for returning the used ByteBuffer back to the Supplier via {code} ByteBufferSupplier#deallocate(ByteBuffer buffer) {code} Some quick notes here regarding concurrency and sharing of the byte-buffer pools: * For pull query handling threads, if we do not do any deserialization then we would not need to access the ByteBufferSuppliers, hence there's no concurrent access. * For multiple streaming threads, my intention is to have each thread getting its own isolated byte-buffer pools to avoid any concurrency. 3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use the allocated ByteBuffer via RocksDB'a direct API directly so that using them for puts/gets would not go through JNI, hence is more efficient. The Supplier then would need to be careful to deallocate these direct byte-buffers since they would not be GC'ed by the JVM. There's a catch though with direct ByteBuffer that we'd need to be careful about: though direct byte buffer is also managed by JVM (and hence GC) like heap byte buffer, they are not managed in the same way as the latter [1][2] and seems to be more conservative. It was suggested that sometimes users need to manually deallocate them if GC did not work promptly. I think the most effective way is that we try very best to re-use allocated direct byte-buffer from native memory, that means we probably want to allocate conservatively large size (if we do not know the serialized length), so that they can be reused. [1] https://www.fusion-reactor.com/blog/evangelism/understanding-java-buffer-pool-memory-space/#:~:text=A%20direct%20buffer%20is%20a,allocateDirect()%20factory%20method [2] https://stackoverflow.com/questions/3496508/deallocating-direct-buffer-native-memory-in-java-for-jogl/26777380#26777380 was: Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design: * The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially
[jira] [Updated] (KAFKA-13286) Revisit Streams State Store and Serde Implementation
[ https://issues.apache.org/jira/browse/KAFKA-13286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13286: -- Description: Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design: * The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially when working with windowed state stores that rely on composite keys. In many places in the code we have extract parts of the composite key to deserialize the either the timestamp or the message key from the state store key (e.g. the methods in WindowStoreUtils). * The serde operation could happen on multiple layers of the state store hierarchies, which means we need to extra byte array copies as we move along doing serdes. For example, we do serde in the metered layer, but then again in cached layer with cache functions, and also in logged stores for generated the key/value in bytes to send to Kafka. To improve on this, we can consider having support for serde into/from ByteBuffers would allow us to reuse the underlying bytearrays and just pass around slices of the underlying Buffers to avoid the unnecessary copying. 1) More specifically, e.g. the serialize interface could be refactored to: {code} ByteBuffer serialize(String topic, T data, ByteBuffer); {code} Where the serialized bytes would be appended to the ByteBuffer. When a series of serialize functions are called along side the state store hierarchies, we then just need to make sure that what's should be appended first to the ByteBuffer would be serialized first. E.g. if the serialized bytes format of a WindowSchema is Then we would need to call the serialize as in: {code} serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); {code} 2) In addition, we can consider having a pool of ByteBuffers representing a set of byte arrays that can be re-used. This can be captured as an intelligent {{ByteBufferSupplier}}, which provides: {code} ByteBuffer ByteBufferSupplier#allocate(long size) {code} Its implementation can choose to either create new byte arrays, or re-use existing ones in the pool; the gottcha though is that we may usually not know the serialized byte length for raw keys (think: in practice the keys would be in json/avro etc), and hence would not know how to pass in {{size}} for serialization, and hence may need to be conservative, or trial and error etc. Of course callers then would be responsible for returning the used ByteBuffer back to the Supplier via {code} ByteBufferSupplier#deallocate(ByteBuffer buffer) {code} Some quick notes here regarding concurrency and sharing of the byte-buffer pools: * For pull query handling threads, if we do not do any deserialization then we would not need to access the ByteBufferSuppliers, hence there's no concurrent access. * For multiple streaming threads, my intention is to have each thread getting its own isolated byte-buffer pools to avoid any concurrency. 3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use the allocated ByteBuffer via RocksDB'a direct API directly so that using them for puts/gets would not go through JNI, hence is more efficient. The Supplier then would need to be careful to deallocate these direct byte-buffers since they would not be GC'ed by the JVM. was: Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design: * The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially when working with windowed state stores that rely on composite keys. In many places in the code we have extract parts of the composite key to deserialize the either the timestamp or the message key from the state store key (e.g. the methods in WindowStoreUtils). * The serde operation could happen on multiple layers of the state store hierarchies, which means we need to extra byte array copies as we move along doing serdes. For example, we do serde in the metered layer, but then again in cached layer with cache functions, and also in logged stores for generated the key/value in bytes to send to Kafka. To improve on this, we can consider having support for serde into/from ByteBuffers would allow us to reuse the underlying bytearrays and just pass around slices of the underlying Buffers to avoid the unnecessary copying. 1) More specifically, e.g. the serialize interface could be refactored to:
[jira] [Commented] (KAFKA-13288) Transaction find-hanging command with --broker-id excludes internal topics
[ https://issues.apache.org/jira/browse/KAFKA-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413278#comment-17413278 ] Jason Gustafson commented on KAFKA-13288: - Note that it is still possible to use it with the --topic option instead. For example: {code} kafka-transaction.sh --bootstrap-server localhost:9092 find-hanging --topic __consumer_offsets {code} > Transaction find-hanging command with --broker-id excludes internal topics > -- > > Key: KAFKA-13288 > URL: https://issues.apache.org/jira/browse/KAFKA-13288 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > We use the vanilla `Admin.listTopics()` in this command if `--broker-id` is > specified. By default, this excludes internal topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] teabot commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
teabot commented on pull request #9345: URL: https://github.com/apache/kafka/pull/9345#issuecomment-916994728 How is PEM certificate renewal possible on the producer/consumer client? Is this documented anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10338) Support PEM format for SSL certificates and private key
[ https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413243#comment-17413243 ] Elliot West edited comment on KAFKA-10338 at 9/10/21, 3:29 PM: --- How is PEM certificate renewal possible on the producer/consumer client? Is this documented anywhere? was (Author: teabot): How is PEM certificate renewal possible on the producer/consumer client? > Support PEM format for SSL certificates and private key > --- > > Key: KAFKA-10338 > URL: https://issues.apache.org/jira/browse/KAFKA-10338 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > We currently support only file-based JKS/PKCS12 format for SSL key stores and > trust stores. It will be good to add support for PEM as configuration values > that fits better with config externalization. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10338) Support PEM format for SSL certificates and private key
[ https://issues.apache.org/jira/browse/KAFKA-10338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413243#comment-17413243 ] Elliot West commented on KAFKA-10338: - How is PEM certificate renewal possible on the producer/consumer client? > Support PEM format for SSL certificates and private key > --- > > Key: KAFKA-10338 > URL: https://issues.apache.org/jira/browse/KAFKA-10338 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > We currently support only file-based JKS/PKCS12 format for SSL key stores and > trust stores. It will be good to add support for PEM as configuration values > that fits better with config externalization. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197 ] F Méthot edited comment on KAFKA-13272 at 9/10/21, 3:17 PM: Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, *When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once?* I saw also the consumer offset topic with data from months ago, which should have been compacted, very similar to https://issues.apache.org/jira/browse/KAFKA-13174 *Also, we were running exactly_once with 2 replicas, [older] documentation stimulate that 3 replicas is needed for exactly_once, is it still the case?* was (Author: fmethot): Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? I saw also the consumer offset topic with data from months ago, which should have been compacted, very similar to https://issues.apache.org/jira/browse/KAFKA-13174 > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} >
[jira] [Updated] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming
[ https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Zhou updated KAFKA-13290: --- Description: I have a Kafka stream event processing code which aggregates 1 minutes data. It works as expected if data comes continuously, If we stop producer, then i found the last aggregated message does not emit until new message coming. Following is my sample code, @Bean public KStream kStream(StreamsBuilder streamBuilder) { KStream aggregatedData = streamBuilder .stream(dataTopic, dataConsumed) .groupByKey(Grouped.with( stringSerde, aggregateValueSerde)) .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) .aggregate(this::initialize, this::aggregateFields, materializedAsWindowStore(windowedStoreName, stringSerde, AggregateMetricsFieldsSerde)) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName(windowedSuppressNodeName)) .toStream().map((key, aggregateMetrics) -> { return KeyValue.pair(key.key(), aggregateMetrics); } ); aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); return aggregatedFlowData; } was: I have a stream code which aggregate 1 minutes data. It works as expected if data comes continuously, If we stop producer, then i found the last aggregated message does not emit until new message coming, even the new message has different key. Following is my sample code, @Bean public KStream kStream(StreamsBuilder streamBuilder) { KStream aggregatedData = streamBuilder .stream(dataTopic, dataConsumed) .groupByKey(Grouped.with( stringSerde, aggregateValueSerde)) .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) .aggregate(this::initialize, this::aggregateFields, materializedAsWindowStore(windowedStoreName, stringSerde, AggregateMetricsFieldsSerde)) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName(windowedSuppressNodeName)) .toStream().map((key, aggregateMetrics) -> { return KeyValue.pair(key.key(), aggregateMetrics); } ); aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); return aggregatedFlowData; } > My timeWindows last aggregated message never emit until a new message coming > - > > Key: KAFKA-13290 > URL: https://issues.apache.org/jira/browse/KAFKA-13290 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 > Environment: Development >Reporter: Steve Zhou >Priority: Major > > I have a Kafka stream event processing code which aggregates 1 minutes data. > It works as expected if data comes continuously, > If we stop producer, then i found the last aggregated message does not emit > until new message coming. > > Following is my sample code, @Bean > public KStream kStream(StreamsBuilder > streamBuilder) { > KStream aggregatedData = streamBuilder > .stream(dataTopic, dataConsumed) > .groupByKey(Grouped.with( > stringSerde, > aggregateValueSerde)) > > .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) > .aggregate(this::initialize, this::aggregateFields, > materializedAsWindowStore(windowedStoreName, > stringSerde, > AggregateMetricsFieldsSerde)) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) > .withName(windowedSuppressNodeName)) > .toStream().map((key, aggregateMetrics) -> > { return KeyValue.pair(key.key(), aggregateMetrics); } > ); > aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); > return aggregatedFlowData; > } > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming
[ https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Zhou updated KAFKA-13290: --- Description: I have a stream code which aggregate 1 minutes data. It works as expected if data comes continuously, If we stop producer, then i found the last aggregated message does not emit until new message coming, even the new message has different key. Following is my sample code, @Bean public KStream kStream(StreamsBuilder streamBuilder) { KStream aggregatedData = streamBuilder .stream(dataTopic, dataConsumed) .groupByKey(Grouped.with( stringSerde, aggregateValueSerde)) .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) .aggregate(this::initialize, this::aggregateFields, materializedAsWindowStore(windowedStoreName, stringSerde, AggregateMetricsFieldsSerde)) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName(windowedSuppressNodeName)) .toStream().map((key, aggregateMetrics) -> { return KeyValue.pair(key.key(), aggregateMetrics); } ); aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); return aggregatedFlowData; } was: I have a stream code which aggregate 1 minutes data. It works as expected if data comes continuously, If we stop producer, then i found the last aggregated message does not emit until new message coming, even the new message has different key. Following is my sample code, @Bean public KStream kStream(StreamsBuilder streamBuilder) { KStream aggregatedData = streamBuilder .stream(dataTopic, dataConsumed) .groupByKey(Grouped.with( stringSerde, aggregateValueSerde)) .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) .aggregate(this::initialize, this::aggregateFields, materializedAsWindowStore(windowedStoreName, stringSerde, AggregateMetricsFieldsSerde)) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName(windowedFlowSuppressNodeName)) .toStream().map((key, aggregateMetrics) -> { return KeyValue.pair(key.key(), aggregateMetrics); }); aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); return aggregatedFlowData; } > My timeWindows last aggregated message never emit until a new message coming > - > > Key: KAFKA-13290 > URL: https://issues.apache.org/jira/browse/KAFKA-13290 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 > Environment: Development >Reporter: Steve Zhou >Priority: Major > > I have a stream code which aggregate 1 minutes data. > It works as expected if data comes continuously, > If we stop producer, then i found the last aggregated message does not emit > until new message coming, even the new message has different key. > > Following is my sample code, @Bean > public KStream kStream(StreamsBuilder > streamBuilder) { > KStream aggregatedData = streamBuilder > .stream(dataTopic, dataConsumed) > .groupByKey(Grouped.with( > stringSerde, > aggregateValueSerde)) > > .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) > .aggregate(this::initialize, this::aggregateFields, > materializedAsWindowStore(windowedStoreName, > stringSerde, > AggregateMetricsFieldsSerde)) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) > .withName(windowedSuppressNodeName)) > .toStream().map((key, aggregateMetrics) -> > { return KeyValue.pair(key.key(), aggregateMetrics); } > ); > aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); > return aggregatedFlowData; > } > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13290) My timeWindows last aggregated message never emit until a new message coming
Steve Zhou created KAFKA-13290: -- Summary: My timeWindows last aggregated message never emit until a new message coming Key: KAFKA-13290 URL: https://issues.apache.org/jira/browse/KAFKA-13290 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.2 Environment: Development Reporter: Steve Zhou I have a stream code which aggregate 1 minutes data. It works as expected if data comes continuously, If we stop producer, then i found the last aggregated message does not emit until new message coming, even the new message has different key. Following is my sample code, @Bean public KStream kStream(StreamsBuilder streamBuilder) { KStream aggregatedData = streamBuilder .stream(dataTopic, dataConsumed) .groupByKey(Grouped.with( stringSerde, aggregateValueSerde)) .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) .aggregate(this::initialize, this::aggregateFields, materializedAsWindowStore(windowedStoreName, stringSerde, AggregateMetricsFieldsSerde)) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName(windowedFlowSuppressNodeName)) .toStream().map((key, aggregateMetrics) -> { return KeyValue.pair(key.key(), aggregateMetrics); }); aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); return aggregatedFlowData; } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197 ] F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:57 PM: Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? I saw also the consumer offset topic with data from months ago, which should have been compacted, very similar to https://issues.apache.org/jira/browse/KAFKA-13174 was (Author: fmethot): Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? I saw also the consumer offset topic with data from months ago, which should have been compacted, very similar to > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that
[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197 ] F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:57 PM: Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? I saw also the consumer offset topic with data from months ago, which should have been compacted, very similar to was (Author: fmethot): Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the >
[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197 ] F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:56 PM: Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart kstream app, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? was (Author: fmethot): Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart service, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system,
[jira] [Comment Edited] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197 ] F Méthot edited comment on KAFKA-13272 at 9/10/21, 1:55 PM: Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart service, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? was (Author: fmethot): Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart service, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the
[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413197#comment-17413197 ] F Méthot commented on KAFKA-13272: -- Hi [~guozhang] Thanks for your feedback. We were puzzled by this error that kept coming back on partition 9 on our consumer {code:java} The following partitions still have unstable offsets which are not cleared on the broker side: [commands-9], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log {code} Even after after we had deleted and recreated that source topic, got rid of Exactly_once, even deleted the __transaction topic. The pattern was: * We had that issue above. * Clear the group in kafka, restart service, reprocess from beginning and it would run fine. * restart kafka, the issue would come back. So my suspicions shifted to the __consumer-offset topic, When we initially ran with exactly_once and got the issue on that partition-9, is it possible that the __consumer-offset kept a faulty transaction-marker as well, and that this faulty marker came back to haunt our consumer, even after we disabled exactly_once? > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: > Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, > command-expiry-store-changelog-9) while trying to send transaction markers > for commands-processor-0_9, these partitions are likely deleted already and > hence can be skipped > (kafka.coordinator.transaction.TransactionMarkerChannelManager){code} > Then > - we stop
[GitHub] [kafka] chia7712 commented on a change in pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
chia7712 commented on a change in pull request #10976: URL: https://github.com/apache/kafka/pull/10976#discussion_r706140746 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ## @@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() { reset(statisticsToAdd1); reset(statisticsToAdd2); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L); bytesWrittenToDatabaseSensor.record(1 + 2, 0L); -replay(bytesWrittenToDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L); bytesReadFromDatabaseSensor.record(2 + 3, 0L); -replay(bytesReadFromDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L); memtableBytesFlushedSensor.record(3 + 4, 0L); -replay(memtableBytesFlushedSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L); memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L); -replay(memtableHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L); writeStallDurationSensor.record(4 + 5, 0L); -replay(writeStallDurationSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L); blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L); -replay(blockCacheDataHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L); +
[GitHub] [kafka] showuon commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on pull request #11227: URL: https://github.com/apache/kafka/pull/11227#issuecomment-916861911 @patrickstuedi , yes, the fix PR (https://github.com/apache/kafka/pull/11292) is under reviewing (should be close). Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
patrickstuedi commented on pull request #11227: URL: https://github.com/apache/kafka/pull/11227#issuecomment-916855214 @showuon Any new updates on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413135#comment-17413135 ] Bruno Cadonna commented on KAFKA-13128: --- And also here: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11250/9/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread/] {code:java} java.lang.AssertionError: Expected: is not null but: was null at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$15(StoreQueryIntegrationTest.java:494) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:545) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:488) {code} > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.8.1 >Reporter: A. Sophie Blee-Goldman >Assignee: Walker Carlson >Priority: Blocker > Labels: flaky-test > Fix For: 3.1.0 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #11317: KAFKA-13287: Upgrade RocksDB to 6.22.1.1
cadonna commented on pull request #11317: URL: https://github.com/apache/kafka/pull/11317#issuecomment-916843098 I even found the PR: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11250/9/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11317: KAFKA-13287: Upgrade RocksDB to 6.22.1.1
cadonna commented on pull request #11317: URL: https://github.com/apache/kafka/pull/11317#issuecomment-916842108 @ableegoldman I think I have already seen this specific failure in another PR. But I agree with you that we need to keep an eye on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r706092654 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -359,15 +362,24 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } -public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { -Objects.requireNonNull(config, "config can't be null"); -this.config = config; +public synchronized final void setTopologyProperties(final Properties props) { +this.topologyProperties = props; +} -return this; +public synchronized final void setStreamsConfig(final StreamsConfig config) { Review comment: @guozhangwang Played around with this a bit and was able to clean things up nicely, plus address some other awkwardness that was bugging me. Worked out great (just need to do some cleanup of the tests now...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13014) KAFKA-Stream stucked when the offset is no more existing
[ https://issues.apache.org/jira/browse/KAFKA-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413090#comment-17413090 ] Ahmed Toumi commented on KAFKA-13014: - Thank you [~mjsax], I think that the guy that I contacted from confluent give a wrong infomation, i didn' found any bug fix bout this issue in 2.7.1 I will check if 2.7.2 fixe this issue because it exactly the same issue on https://issues.apache.org/jira/browse/KAFKA-12951 but with stateStore and not GlobalKTable. > KAFKA-Stream stucked when the offset is no more existing > > > Key: KAFKA-13014 > URL: https://issues.apache.org/jira/browse/KAFKA-13014 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, offset manager, streams >Affects Versions: 2.7.0 > Environment: PROD >Reporter: Ahmed Toumi >Priority: Major > Attachments: image-2021-06-30-11-10-31-028.png > > > We have kafka-stream with multiple instances and threads. > This kafka-stream consume from a lot of topics. > One of the topic partitions wasn't accessible for a day and the retention of > the topic is 4 Hours. > After fixing the problem, the kafka-stream is trying to consume from an > offset that does ot exist anymore: > * Kafka-consumer-group describe: > !image-2021-06-30-11-10-31-028.png! > We can see that the current offset that the KS is waiting for is *59754934* > but the new first offset of this partition is *264896001*. > The problem that the Kafka-stream does not throw any exception > that's the only log what i'm seeing > > {code:java} > 08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Updating assignment with08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Updating assignment with Assigned > partitions: [adm__article_ean_repartition_v3-10, > adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, > adm__article_stock_repartition_v3-10] Current owned partitions: > [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, > adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] > Added partitions (assigned - owned): [] Revoked partitions (owned - > assigned): [] 08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Notifying assignor about the new > Assignment(partitions=[adm__article_stock_repartition_v3-10, > adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, > adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer] > No followup rebalance was requested, resetting the rebalance > schedule.08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.s.p.internals.TaskManager - stream-thread > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > Handle new assignment with: New active tasks: [0_10] New standby tasks: > [0_17, 0_21] Existing active tasks: [0_10] Existing standby tasks: [0_17, > 0_21]08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Adding newly assigned partitions: > {code} > > PI: version broker kafka : 5.3.4-ccs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wycccccc commented on pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
wycc commented on pull request #10976: URL: https://github.com/apache/kafka/pull/10976#issuecomment-916776585 @ijuma I think I survived this round of reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wycccccc commented on pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
wycc commented on pull request #10976: URL: https://github.com/apache/kafka/pull/10976#issuecomment-916775979 @chia7712 I think I have solved most of the problems, except merger `setUpMetricsStubMock`. In order to avoid existing static mock registration problem caused by repeatedly calling `setUpMetricsStubMock` method.I put `mockStatic` in `@Before`.So I think It's not bad at the moment about `tearDown()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r706014148 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -359,15 +362,24 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } -public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { -Objects.requireNonNull(config, "config can't be null"); -this.config = config; +public synchronized final void setTopologyProperties(final Properties props) { +this.topologyProperties = props; +} -return this; +public synchronized final void setStreamsConfig(final StreamsConfig config) { Review comment: That all makes sense to me -- initially I was trying to make this just work with the existing public API, but ultimately figured that even the concept of "topology-level overrides" would require a KIP. If we're only dealing with named topologies then we're free to do things the right way from the start -- thanks for the suggestions :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
ableegoldman commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r706010750 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ## @@ -292,13 +408,46 @@ public V fetch(final K key, time); } +private long getActualWindowStartTime(final long timeFrom) { +return Math.max(timeFrom, ((PersistentWindowStore) wrapped()).getObservedStreamTime() - retentionPeriod + 1); +} + +private KeyValueIterator, V> filterExpiredRecords(final boolean forward) { +final KeyValueIterator, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll(); + +final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime(); +if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP) +return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); + +final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; +final List, byte[]>> windowedKeyValuesInBoundary = new ArrayList<>(); + +while (allWindowedKeyValueIterator.hasNext()) { +final KeyValue, byte[]> next = allWindowedKeyValueIterator.next(); +if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) { +continue; +} +windowedKeyValuesInBoundary.add(next); +} +return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, streamsMetrics, serdes, time); +} Review comment: > it's failing for test cases like shouldNotThrowConcurrentModificationException . This seems to be because the put() call while iterating You don't mean that the test itself is failing, just that the filter isn't being applied to the `put` records, right? > ...the put() call while iterating is appending to the wrapped instance of iterator and hence it's not visible Can you also expand a bit on what you mean by this? Not sure I have the whole picture here since I haven't been following this too closely, thanks in advance for catching me up > Looking at this, do you think it would be a good idea to move this logic in the actual RocksDB implementations? Or do you think there's a better way to do it here in MeteredStore class itself? Personally I think the benefits of keeping this in the metering layer outweigh any downsides, especially if it comes down to just some weird edge case(s) that don't play nicely with the filtering. But I'm also interested in hearing what @showuon makes of this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j
[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12774: Fix Version/s: (was: 2.8.1) > kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through > log4j > > > Key: KAFKA-12774 > URL: https://issues.apache.org/jira/browse/KAFKA-12774 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Jørgen >Priority: Minor > Fix For: 3.1.0, 3.0.1 > > > When exceptions is handled in the uncaught-exception handler introduced in > KS2.8, the logging of the stacktrace doesn't seem to go through the logging > framework configured by the application (log4j2 in our case), but gets > printed to console "line-by-line". > All other exceptions logged by kafka-streams go through log4j2 and gets > formatted properly according to the log4j2 appender (json in our case). > Haven't tested this on other frameworks like logback. > Application setup: > * Spring-boot 2.4.5 > * Log4j 2.13.3 > * Slf4j 1.7.30 > Log4j2 appender config: > {code:java} > > > stacktraceAsString="true" properties="true"> > value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/> > > > {code} > Uncaught exception handler config: > {code:java} > kafkaStreams.setUncaughtExceptionHandler { exception -> > logger.warn("Uncaught exception handled - replacing thread", exception) > // logged properly > > StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD > } {code} > Stacktrace that gets printed line-by-line: > {code:java} > Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic xxx-repartition for task 3_2 due > to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id.Exception handler choose to FAIL the processing, no more > records would be sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226) >at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242) > at java.base/java.lang.Thread.run(Unknown Source)Caused by: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. {code} > > It's a little bit hard to reproduce as I haven't found any way to trigger > uncaught-exception-handler through junit-tests. > Link to discussion on slack: > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12622) Automate LICENSE file validation
[ https://issues.apache.org/jira/browse/KAFKA-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12622: Fix Version/s: (was: 2.8.1) 3.1.0 > Automate LICENSE file validation > > > Key: KAFKA-12622 > URL: https://issues.apache.org/jira/browse/KAFKA-12622 > Project: Kafka > Issue Type: Task >Reporter: John Roesler >Priority: Major > Fix For: 3.1.0, 3.0.1 > > > In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed > a correct license file for 2.8.0. This file will certainly become wrong again > in later releases, so we need to write some kind of script to automate a > check. > It crossed my mind to automate the generation of the file, but it seems to be > an intractable problem, considering that each dependency may change licenses, > may package license files, link to them from their poms, link to them from > their repos, etc. I've also found multiple URLs listed with various > delimiters, broken links that I have to chase down, etc. > Therefore, it seems like the solution to aim for is simply: list all the jars > that we package, and print out a report of each jar that's extra or missing > vs. the ones in our `LICENSE-binary` file. > The check should be part of the release script at least, if not part of the > regular build (so we keep it up to date as dependencies change). > > Here's how I do this manually right now: > {code:java} > // build the binary artifacts > $ ./gradlewAll releaseTarGz > // unpack the binary artifact > $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz > $ cd xf kafka_2.13-X.Y.Z > // list the packaged jars > // (you can ignore the jars for our own modules, like kafka, kafka-clients, > etc.) > $ ls libs/ > // cross check the jars with the packaged LICENSE > // make sure all dependencies are listed with the right versions > $ cat LICENSE > // also double check all the mentioned license files are present > $ ls licenses {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
[ https://issues.apache.org/jira/browse/KAFKA-13164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13164: Fix Version/s: (was: 2.8.1) > State store is attached to wrong node in the Kafka Streams topology > --- > > Key: KAFKA-13164 > URL: https://issues.apache.org/jira/browse/KAFKA-13164 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: local development (MacOS Big Sur 11.4) >Reporter: Ralph Matthias Debusmann >Assignee: Hao Li >Priority: Major > Fix For: 3.0.1 > > Attachments: 1.jpg, 3.jpg > > > Hi, > mjsax and me noticed a bug where a state store is attached to the wrong node > in the Kafka Streams topology. > The issue arised when I tried to read a topic into a KTable, then continued > with a mapValues(), and then joined this KTable with a KStream, like so: > > var kTable = this.streamsBuilder.table().mapValues( function>); > > and then later: > > var joinedKStream = kstream.leftJoin(kTable, ); > > The join didn't work, and neither did it work when I added Materialized.as() > to mapValues(), like so: > var kTable = this.streamsBuilder.table().mapValues( function>, *Materialized.as()*); > > Interestingly, I could get the join to work, when I first read the topic > into a *KStream*, then continued with the mapValues(), then turned the > KStream into a KTable, and then joined the KTable with the other KStream, > like so: > > var kTable = this.streamsBuilder.stream().mapValues( function>).toTable(); > > (the join worked the same as above) > > When mjsax and me had a look on the topology, we could see that in the > former, not working code, the state store (required for the join) is attached > to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node > (see attachment "1.jpg"). In the working code, the state store is (correctly) > attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). > > Best regards, > xdgrulez > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException
[ https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12644: Fix Version/s: (was: 2.8.1) > Add Missing Class-Level Javadoc to Descendants of > org.apache.kafka.common.errors.ApiException > - > > Key: KAFKA-12644 > URL: https://issues.apache.org/jira/browse/KAFKA-12644 > Project: Kafka > Issue Type: Improvement > Components: clients, documentation >Affects Versions: 3.0.0, 2.8.1 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > Labels: documentation > Fix For: 3.1.0, 3.0.1 > > > I noticed that class-level Javadocs are missing from some classes in the > org.apache.kafka.common.errors package. This issue is for tracking the work > of adding the missing class-level javadocs for those Exception classes. > https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html > https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors > Basic class-level documentation could be derived by mapping the error > conditions documented in the protocol > https://kafka.apache.org/protocol#protocol_constants -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13256) Possible NPE in ConfigDef when rendering (enriched) RST or HTML when documentation is not set/NULL
[ https://issues.apache.org/jira/browse/KAFKA-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17413010#comment-17413010 ] Mickael Maison commented on KAFKA-13256: While this does not affect Apache Kafka itself, ConfigDef is public API and used by Connect connectors and third party code like Debezium. In a perfect world, you can argue the documentation should always be set but as it was a small fix, it made sense to merge it. > Possible NPE in ConfigDef when rendering (enriched) RST or HTML when > documentation is not set/NULL > -- > > Key: KAFKA-13256 > URL: https://issues.apache.org/jira/browse/KAFKA-13256 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0, 2.8.0 >Reporter: René Kerner >Assignee: René Kerner >Priority: Major > Fix For: 3.1.0 > > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > While working on Debezium I discovered the following issue: > When Kafka's ConfigDef renders the HTML or RST documentation representation > of the config definition, it requires `ConfigKey.documentation` member > variable to be a java.lang.String instance that's set to an actual value > different than NULL, else NPE happens: > {code:java} > b.append(key.documentation.replaceAll("\n", "")); > {code} > {code:java} > for (String docLine : key.documentation.split("\n")) { > {code} > > When `documentation` is not set/NULL I suggest to either set a valid String > like "No documentation available" or skip that config key. > > I could provide a PR to fix this soon. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Sheppard updated KAFKA-13289: - Description: When pushing bulk data through a kafka-steams app, I see it log the following message many times... {noformat} WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment. {noformat} ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams select a key for them and try to leftJoin the two rekeyed streams. Self contained source code for that reproduction is available at https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java The actual kafka-streams topology in there looks like this. {code:java} final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(leftTopic); final KStream rightStream = builder.stream(rightTopic); final KStream rekeyedLeftStream = leftStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); final KStream rekeyedRightStream = rightStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); final KStream joined = rekeyedLeftStream.leftJoin( rekeyedRightStream, (left, right) -> left + "/" + right, joinWindow ); {code} ...and the eventual output I produce looks like this... {code} ... 523 [523,left/null] 524 [524,left/null, 524,left/524,right] 525 [525,left/525,right] 526 [526,left/null] 527 [527,left/null] 528 [528,left/528,right] 529 [529,left/null] 530 [530,left/null] 531 [531,left/null, 531,left/531,right] 532 [532,left/null] 533 [533,left/null] 534 [534,left/null, 534,left/534,right] 535 [535,left/null] 536 [536,left/null] 537 [537,left/null, 537,left/537,right] 538 [538,left/null] 539 [539,left/null] 540 [540,left/null] 541 [541,left/null] 542 [542,left/null] 543 [543,left/null] ... {code} ...where as, given the input data, I expect to see every row end with the two values joined, rather than the right value being null. Note that I understand it's expected that we initially get the left/null values for many values since that's the expected semantics of kafka-streams left join, at least until https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious I've noticed that if I set a very large grace value on the join window the problem is solved, but since the input I provide is not out of order I did not expect to need to do that, and I'm weary of the resource requirements doing so in practice on an application with a lot of volume. My suspicion is that something is happening such that when one partition is processed it causes the stream time to be pushed forward to the newest message in that partition, meaning when the next partition is then examined it is found to contain many records which are 'too old' compared to the stream time. I ran across this discussion thread which seems to cover the same issue http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e and had a request from [~cadonna] for a reproduction case, so I'm hoping my example above might make the issue easier to tackle! was: When pushing bulk data through a kafka-steams app, I see it log the following message many times... {noformat} WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment. {noformat} ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams
[GitHub] [kafka] chia7712 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream
chia7712 commented on a change in pull request #11017: URL: https://github.com/apache/kafka/pull/11017#discussion_r705913852 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -977,6 +977,11 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin return streamThread; } +// Ensure Mockito stub construct with capture argument for KafkaStreamsTest. +public static Metrics createThisMetrics(final MetricConfig metricConfig, final List reporters, final Time time, final MetricsContext metricsContext) { Review comment: Why need the word "this"? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -246,10 +246,15 @@ State setState(final State newState) { public boolean isRunning() { synchronized (stateLock) { -return state.isAlive(); +return isStateAlive(); } } +// Ensure Mockito can stub method for KafkaStreamTest. +public boolean isStateAlive() { Review comment: Personally, it is an anti-pattern. There are two Public methods returning same "value" but one of them is holding lock. It is hard to use them correctly. At any rate, this is unrelated to this PR. ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -903,20 +897,15 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() { streams.start(); } -PowerMock.verify(Executors.class); Review comment: please add `verify` to make sure `rocksDBMetricsRecordingTriggerThread` is used to create thread in this test. ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread thread, "", Collections.emptySet(), Collections.emptySet() -) -).anyTimes(); - EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true); -EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1); -thread.resizeCache(EasyMock.anyLong()); -EasyMock.expectLastCall().anyTimes(); -thread.requestLeaveGroupDuringShutdown(); -EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId); -thread.shutdown(); -EasyMock.expectLastCall().andAnswer(() -> { +)); +when(thread.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true); +when(thread.isStateAlive()).thenReturn(true); +verify(thread, atMostOnce()).isStateAlive(); Review comment: It seems to me this check is no-op since the thread is not ready to be used. Hence, the execution times is always 0. ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread thread, "", Collections.emptySet(), Collections.emptySet() -) -).anyTimes(); - EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true); -EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1); -thread.resizeCache(EasyMock.anyLong()); -EasyMock.expectLastCall().anyTimes(); -thread.requestLeaveGroupDuringShutdown(); -EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId); -thread.shutdown(); -EasyMock.expectLastCall().andAnswer(() -> { +)); +when(thread.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true); +when(thread.isStateAlive()).thenReturn(true); +verify(thread, atMostOnce()).isStateAlive(); +when(thread.getName()).thenReturn("processId-StreamThread-" + threadId); + +doAnswer(invocation -> { supplier.consumer.close(); supplier.restoreConsumer.close(); for (final MockProducer producer : supplier.producers) { producer.close(); } state.set(StreamThread.State.DEAD); -threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING); -threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN); +threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING); +threadStateListenerCapture.getValue().onChange(thread,
[GitHub] [kafka] dajac merged pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1
dajac merged pull request #11318: URL: https://github.com/apache/kafka/pull/11318 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1
dajac commented on pull request #11318: URL: https://github.com/apache/kafka/pull/11318#issuecomment-916669933 @omkreddy Yes, we should also fix trunk. I will do a separate one for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Sheppard updated KAFKA-13289: - Summary: Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment` (was: Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`) > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Major > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across > https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html > from a year and a half ago which seems to describe the same problem, but I'm > hoping the self-contained reproduction might make the issue easier to
[jira] [Updated] (KAFKA-13289) Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Sheppard updated KAFKA-13289: - Description: When pushing bulk data through a kafka-steams app, I see it log the following message many times... {noformat} WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment. {noformat} ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams select a key for them and try to leftJoin the two rekeyed streams. Self contained source code for that reproduction is available at https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java The actual kafka-streams topology in there looks like this. {code:java} final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(leftTopic); final KStream rightStream = builder.stream(rightTopic); final KStream rekeyedLeftStream = leftStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); final KStream rekeyedRightStream = rightStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); final KStream joined = rekeyedLeftStream.leftJoin( rekeyedRightStream, (left, right) -> left + "/" + right, joinWindow ); {code} ...and the eventual output I produce looks like this... {code} ... 523 [523,left/null] 524 [524,left/null, 524,left/524,right] 525 [525,left/525,right] 526 [526,left/null] 527 [527,left/null] 528 [528,left/528,right] 529 [529,left/null] 530 [530,left/null] 531 [531,left/null, 531,left/531,right] 532 [532,left/null] 533 [533,left/null] 534 [534,left/null, 534,left/534,right] 535 [535,left/null] 536 [536,left/null] 537 [537,left/null, 537,left/537,right] 538 [538,left/null] 539 [539,left/null] 540 [540,left/null] 541 [541,left/null] 542 [542,left/null] 543 [543,left/null] ... {code} ...where as, given the input data, I expect to see every row end with the two values joined, rather than the right value being null. Note that I understand it's expected that we initially get the left/null values for many values since that's the expected semantics of kafka-streams left join, at least until https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious I've noticed that if I set a very large grace value on the join window the problem is solved, but since the input I provide is not out of order I did not expect to need to do that, and I'm weary of the resource requirements doing so in practice on an application with a lot of volume. My suspicion is that something is happening such that when one partition is processed it causes the stream time to be pushed forward to the newest message in that partition, meaning when the next partition is then examined it is found to contain many records which are 'too old' compared to the stream time. I ran across https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html from a year and a half ago which seems to describe the same problem, but I'm hoping the self-contained reproduction might make the issue easier to tackle! was: When pushing bulk data through a kafka-steams app, I see it log the following message many times... `WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.` ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams select a key for them and try to leftJoin the two rekeyed streams. Self contained source code for that
[jira] [Updated] (KAFKA-13289) Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Sheppard updated KAFKA-13289: - Description: When pushing bulk data through a kafka-steams app, I see it log the following message many times... `WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.` ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams select a key for them and try to leftJoin the two rekeyed streams. Self contained source code for that reproduction is available at https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java The actual kafka-streams topology in there looks like this. {code:java} final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(leftTopic); final KStream rightStream = builder.stream(rightTopic); final KStream rekeyedLeftStream = leftStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); final KStream rekeyedRightStream = rightStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); final KStream joined = rekeyedLeftStream.leftJoin( rekeyedRightStream, (left, right) -> left + "/" + right, joinWindow ); {code} ...and the eventual output I produce looks like this... {code} ... 523 [523,left/null] 524 [524,left/null, 524,left/524,right] 525 [525,left/525,right] 526 [526,left/null] 527 [527,left/null] 528 [528,left/528,right] 529 [529,left/null] 530 [530,left/null] 531 [531,left/null, 531,left/531,right] 532 [532,left/null] 533 [533,left/null] 534 [534,left/null, 534,left/534,right] 535 [535,left/null] 536 [536,left/null] 537 [537,left/null, 537,left/537,right] 538 [538,left/null] 539 [539,left/null] 540 [540,left/null] 541 [541,left/null] 542 [542,left/null] 543 [543,left/null] ... {code} ...where as, given the input data, I expect to see every row end with the two values joined, rather than the right value being null. Note that I understand it's expected that we initially get the left/null values for many values since that's the expected semantics of kafka-streams left join, at least until https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious I've noticed that if I set a very large grace value on the join window the problem is solved, but since the input I provide is not out of order I did not expect to need to do that, and I'm weary of the resource requirements doing so in practice on an application with a lot of volume. My suspicion is that something is happening such that when one partition is processed it causes the stream time to be pushed forward to the newest message in that partition, meaning when the next partition is then examined it is found to contain many records which are 'too old' compared to the stream time. I ran across https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html from a year and a half ago which seems to describe the same problem, but I'm hoping the self-contained reproduction might make the issue easier to tackle! was: When pushing bulk data through a kafka-steams app, I see it log the following message many times... `WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.` ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams select a key for them and try to leftJoin the two rekeyed streams. Self contained source code for that reproduction is
[jira] [Created] (KAFKA-13289) Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment`
Matthew Sheppard created KAFKA-13289: Summary: Bulk processing data through a join with kafka-streams results in `Skipping record for expired segment` Key: KAFKA-13289 URL: https://issues.apache.org/jira/browse/KAFKA-13289 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Reporter: Matthew Sheppard When pushing bulk data through a kafka-steams app, I see it log the following message many times... `WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.` ...and data which I expect to have been joined through a leftJoin step appears to be lost. I've seen this in practice either when my application has been shut down for a while and then is brought back up, or when I've used something like the [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) in an attempt to have the application reprocess past data. I was able to reproduce this behaviour in isolation by generating 1000 messages to two topics spaced an hour apart (with the original timestamps in order), then having kafka streams select a key for them and try to leftJoin the two rekeyed streams. Self contained source code for that reproduction is available at https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java The actual kafka-streams topology in there looks like this. ``` final StreamsBuilder builder = new StreamsBuilder(); final KStream leftStream = builder.stream(leftTopic); final KStream rightStream = builder.stream(rightTopic); final KStream rekeyedLeftStream = leftStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); final KStream rekeyedRightStream = rightStream .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); final KStream joined = rekeyedLeftStream.leftJoin( rekeyedRightStream, (left, right) -> left + "/" + right, joinWindow ); ``` ...and the eventual output I produce looks like this... ``` ... 523 [523,left/null] 524 [524,left/null, 524,left/524,right] 525 [525,left/525,right] 526 [526,left/null] 527 [527,left/null] 528 [528,left/528,right] 529 [529,left/null] 530 [530,left/null] 531 [531,left/null, 531,left/531,right] 532 [532,left/null] 533 [533,left/null] 534 [534,left/null, 534,left/534,right] 535 [535,left/null] 536 [536,left/null] 537 [537,left/null, 537,left/537,right] 538 [538,left/null] 539 [539,left/null] 540 [540,left/null] 541 [541,left/null] 542 [542,left/null] 543 [543,left/null] ... ``` ...where as, given the input data, I expect to see every row end with the two values joined, rather than the right value being null. Note that I understand it's expected that we initially get the left/null values for many values since that's the expected semantics of kafka-streams left join, at least until https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious I've noticed that if I set a very large grace value on the join window the problem is solved, but since the input I provide is not out of order I did not expect to need to do that, and I'm weary of the resource requirements doing so in practice on an application with a lot of volume. My suspicion is that something is happening such that when one partition is processed it causes the stream time to be pushed forward to the newest message in that partition, meaning when the next partition is then examined it is found to contain many records which are 'too old' compared to the stream time. I ran across https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html from a year and a half ago which seems to describe the same problem, but I'm hoping the self-contained reproduction might make the issue easier to tackle! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on a change in pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1
omkreddy commented on a change in pull request #11318: URL: https://github.com/apache/kafka/pull/11318#discussion_r705926622 ## File path: docs/upgrade.html ## @@ -39,7 +39,7 @@ Notable changes in 2 -Upgrading to 2.7.0 from any version 0.8.x through 2.6.x +Upgrading to 2.8.1 from any version 0.8.x through 2.7.x Review comment: looks like we missed adding `Upgrading to 2.8.0` section earlier. I think, we should add new section for `2.8.0` release and retain existing "Upgrading to 2.7.0" section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1
omkreddy commented on a change in pull request #11318: URL: https://github.com/apache/kafka/pull/11318#discussion_r705926622 ## File path: docs/upgrade.html ## @@ -39,7 +39,7 @@ Notable changes in 2 -Upgrading to 2.7.0 from any version 0.8.x through 2.6.x +Upgrading to 2.8.1 from any version 0.8.x through 2.7.x Review comment: looks like we missed adding `Upgrading to 2.8.0` section earlier. I think, we should add new section from "2.8.0" release and retain existing "Upgrading to 2.7.0" section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #11318: MINOR: Bump version in upgrade guide to 2.8.1
dajac opened a new pull request #11318: URL: https://github.com/apache/kafka/pull/11318 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org