[jira] [Created] (KAFKA-17441) Add RETRY option to other exception handlers
A. Sophie Blee-Goldman created KAFKA-17441: -- Summary: Add RETRY option to other exception handlers Key: KAFKA-17441 URL: https://issues.apache.org/jira/browse/KAFKA-17441 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In kip-1065 we added a RETRY response option to the ProductionExceptionHandler. However, retrying an action instead of either crashing or dropping records would be useful for the other exception handlers as well. We should consider a followup KIP to add a RETRY response option to the ProcessingExceptionHandler and/or DeserializationExceptionHandler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16875) Replace ClientState with TaskAssignment when creating individual consumer Assignments
A. Sophie Blee-Goldman created KAFKA-16875: -- Summary: Replace ClientState with TaskAssignment when creating individual consumer Assignments Key: KAFKA-16875 URL: https://issues.apache.org/jira/browse/KAFKA-16875 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman In the initial implementation of KIP-924 in version 3.8, we converted from the new TaskAssignor's output type (TaskAssignment) into the old ClientState-based assignment representation. This allowed us to plug in a custom assignor without converting all the internal mechanisms that occur after the KafkaStreams client level assignment and process it into a consumer level assignment. However we ultimately want to get rid of ClientState altogether, so we need to invert this logic so that we instead convert the ClientState into a TaskAssignment and then use the TaskAssignment to process the assigned tasks into consumer Assignments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16874) Remove old TaskAssignor interface
A. Sophie Blee-Goldman created KAFKA-16874: -- Summary: Remove old TaskAssignor interface Key: KAFKA-16874 URL: https://issues.apache.org/jira/browse/KAFKA-16874 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman Once we have the new HAAssignor that implements the new TaskAssignor interface, we can remove the old TaskAssignor interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16873) Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS
A. Sophie Blee-Goldman created KAFKA-16873: -- Summary: Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS Key: KAFKA-16873 URL: https://issues.apache.org/jira/browse/KAFKA-16873 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman Once we have all the out-of-the-box assignors implementing the new TaskAssignor interface that corresponds to the new public task assignor config, we can remove the old internal task assignor config altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16872) Remove ClientState class
A. Sophie Blee-Goldman created KAFKA-16872: -- Summary: Remove ClientState class Key: KAFKA-16872 URL: https://issues.apache.org/jira/browse/KAFKA-16872 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman One of the end-state goals of KIP-924 is to remove the ClientState class altogether. There are some blockers to this such as the removal of the old internal task assignor config and the old HAAssignor, so this ticket will probably be one of the very last KAFKA-16868 subtasks to be tackled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams
A. Sophie Blee-Goldman created KAFKA-16871: -- Summary: Clean up internal AssignmentConfigs class in Streams Key: KAFKA-16871 URL: https://issues.apache.org/jira/browse/KAFKA-16871 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman In KIP-924 we added a new public AssignmentConfigs class to hold all of the, you guessed it, assignment related configs. However, there is an existing config class of the same name and largely the same contents but that's in an internal package, specifically inside the AssignorConfiguration class. We should remove the old AssignmentConfigs class that's in AssignorConfiguration and replace any usages of it with the new public AssignmentConfigs that we added in KIP-924 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16869) Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface
A. Sophie Blee-Goldman created KAFKA-16869: -- Summary: Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface Key: KAFKA-16869 URL: https://issues.apache.org/jira/browse/KAFKA-16869 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman We need to add a new HighAvailabilityTaskAssignor that implements the new TaskAssignor interface. Once we have that, we need to remember to also make these related changes: # Change the StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG default from null to the new HAAssignor # Check for this new HAAssignor type when evaluating the OptionalInt rack-aware assignment configs in the public AssignmentConfigs class. If these configs are Optional.empty() and the new HAAssignor is used, they should be overridden to the HAAssignor-specific default values. This code already exists but should be updated to check for the new HAAssignor class name instead of "null" # Until the old HAAssignor and old internal task assignor config can be removed completely, make sure the new HAAssignor is used by default when a TaskAssignor is selected in StreamsPartitionAssignor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16868) Post KIP-924 StreamsPartitionAssignor code cleanup
A. Sophie Blee-Goldman created KAFKA-16868: -- Summary: Post KIP-924 StreamsPartitionAssignor code cleanup Key: KAFKA-16868 URL: https://issues.apache.org/jira/browse/KAFKA-16868 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman Making an umbrella task for all of the tech debt and code consolidation cleanup work that can/should be done following the implementation of [KIP-924: customizable task assignment for Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams] Most of this revolves around deduplicating code once it's no longer needed, including classes like the ClientState, StandbyTaskAssignor and related elements, and the old TaskAssignor interface along with its implementations. Note that in 3.8, the first version in which KIP-924 was released, we just added the new public config and new TaskAssignor interface but did not get rid of the old internal config or old TaskAssignor interface. If neither config is set in 3.8 we still default to the old HAAssignor, as a kind of opt-in feature flag, and internally will convert the output of the new TaskAssignor into the old style of ClientState-based assignment tracking. We intend to clean up all of the old code and eventually support only the new TaskAssignor interface as well as converting everything internally from the ClientState to the TaskAssignment/KafkaStreamsAssignment style output -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16867) Streams should run tag-based standby assignment based on rack ids
A. Sophie Blee-Goldman created KAFKA-16867: -- Summary: Streams should run tag-based standby assignment based on rack ids Key: KAFKA-16867 URL: https://issues.apache.org/jira/browse/KAFKA-16867 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman In KIP-708, we introduced a tag-based standby task assignment algorithm that runs if the user has configured their clients with "rack aware assignment tags". If no tags are configured, the default load-based standby task assignment algorithm is run instead. In KIP-924 we introduced a different kind of rack-aware assignment logic which is based on the "rack.id" of the consumers and topic partitions. While this did not replace the tag-based rack-aware assignment of KIP-708 which had different (and opposing) goals, we realized that Streams could leverage the rack.ids to run the tag-based standby task assignment algorithm even if clients were not configured with assignment tags. Unfortunately, during implementation of KIP-924, a bug in the logic meant that the tag-based algorithm was never actually being run based on the rack ids. This bug is present to this day and carried over (intentionally) during the task assignor refactoring of KIP-924. We should still fix this bug so that users can benefit from the resiliency of KIP-708 based on consumer rack ids, even if they did not explicitly opt-in by configuring clients with assignment tags, since KIP-708 is a net benefit with no downside -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15170. Resolution: Fixed > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
A. Sophie Blee-Goldman created KAFKA-16758: -- Summary: Extend Consumer#close with option to leave the group or not Key: KAFKA-16758 URL: https://issues.apache.org/jira/browse/KAFKA-16758 Project: Kafka Issue Type: New Feature Components: consumer Reporter: A. Sophie Blee-Goldman See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the full context. Essentially we would get rid of the "internal.leave.group.on.close" config that is used as a backdoor by Kafka Streams right now to prevent closed consumers from leaving the group, thus reducing unnecessary task movements after a simple bounce. This would be replaced by an actual public API that would allow the caller to opt in or out to the LeaveGroup when close is called. This would be similar to the KafkaStreams#close(CloseOptions) API, and in fact would be how that API will be implemented (since it only works for static groups at the moment as noted in KAFKA-16514 ) This has several benefits over the current situation: # It allows plain consumer apps to opt-out of leaving the group when closed, which is currently not possible through any public API (only an internal backdoor config) # It enables the caller to dynamically select the appropriate action depending on why the client is being closed – for example, you would not want the consumer to leave the group during a simple restart, but would want it to leave the group when shutting down the app or if scaling down the node. This is not possible today, even with the internal config, since configs are immutable # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so that the user's choice to leave the group during close will be respected for non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-16277. Resolution: Fixed > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15782) Establish concrete project conventions to define public APIs that require a KIP
A. Sophie Blee-Goldman created KAFKA-15782: -- Summary: Establish concrete project conventions to define public APIs that require a KIP Key: KAFKA-15782 URL: https://issues.apache.org/jira/browse/KAFKA-15782 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman There seems to be no concrete definition that establishes project-specific conventions for what is and is not considered a public API change that requires a KIP. This results in frequent drawn-out debates that revisit the same topic and slow things down, and often ends up forcing trivial changes through the KIP process. For a recent example, KIP-998 was required for a one-line change just to add the "protected" access modifier to an otherwise package-private class. See [this comment thread|https://github.com/apache/kafka/pull/14681#discussion_r1378591228] for the full debate on this subject. It would be beneficial and in the long run save us all time to just sit down and hash out the project conventions, such as whether a package-private/protected method on a non-final java class is to be considered a public API, even if the method itself is/was never a public method. This will of course require a KIP, but should help to establish some ground rules to avoid any more superfluous KIPs in the future -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected
A. Sophie Blee-Goldman created KAFKA-15781: -- Summary: Change ProducerConfig(props, doLog) constructor to protected Key: KAFKA-15781 URL: https://issues.apache.org/jira/browse/KAFKA-15781 Project: Kafka Issue Type: Improvement Components: producer Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman See https://github.com/apache/kafka/pull/14681 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15116) Kafka Streams processing blocked during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15116. Resolution: Not A Problem > Kafka Streams processing blocked during rebalance > - > > Key: KAFKA-15116 > URL: https://issues.apache.org/jira/browse/KAFKA-15116 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0 >Reporter: David Gammon >Priority: Major > > We have a Kafka Streams application that simply takes a messages, processes > it and then produces an event out the other side. The complexity is that > there is a requirement that all events with the same partition key must be > committed before the next message is processed. > This works most of the time flawlessly but we have started to see problems > during deployments where the first message blocks the second message during a > rebalance because the first message isn’t committed before the second message > is processed. This ultimately results in transactions timing out and more > rebalancing. > We’ve tried lots of configuration to get the behaviour we require with no > luck. We’ve now put in a temporary fix so that Kafka Streams works with our > framework but it feels like this might be a missing feature or potentially a > bug. > +Example+ > Given: > * We have two messages (InA and InB). > * Both messages have the same partition key. > * A rebalance is in progress so streams is no longer able to commit. > When: > # Message InA -> processor -> OutA (not committed) > # Message InB -> processor -> blocked because #1 has not been committed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM
[ https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12550. Resolution: Won't Fix Closing this out since it's usefulness is preempted by the StateUpdaterThread and having moved restoration out of the main StreamThread > Introduce RESTORING state to the KafkaStreams FSM > - > > Key: KAFKA-12550 > URL: https://issues.apache.org/jira/browse/KAFKA-12550 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > We should consider adding a new state to the KafkaStreams FSM: RESTORING > This would cover the time between the completion of a stable rebalance and > the completion of restoration across the client. Currently, Streams will > report the state during this time as REBALANCING even though it is generally > spending much more time restoring than rebalancing in most cases. > There are a few motivations/benefits behind this idea: > # Observability is a big one: using the umbrella REBALANCING state to cover > all aspects of rebalancing -> task initialization -> restoring has been a > common source of confusion in the past. It’s also proved to be a time sink > for us, during escalations, incidents, mailing list questions, and bug > reports. It often adds latency to escalations in particular as we have to go > through GTS and wait for the customer to clarify whether their “Kafka Streams > is stuck rebalancing” ticket means that it’s literally rebalancing, or just > in the REBALANCING state and actually stuck elsewhere in Streams > # Prereq for global thread improvements: for example [KIP-406: > GlobalStreamThread should honor custom reset policy > |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy] > was ultimately blocked on this as we needed to pause the Streams app while > the global thread restored from the appropriate offset. Since there’s > absolutely no rebalancing involved in this case, piggybacking on the > REBALANCING state would just be shooting ourselves in the foot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15463) StreamsException: Accessing from an unknown node
[ https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15463. Resolution: Not A Problem > StreamsException: Accessing from an unknown node > - > > Key: KAFKA-15463 > URL: https://issues.apache.org/jira/browse/KAFKA-15463 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.1 >Reporter: Yevgeny >Priority: Major > > After some time application was working fine, starting to get: > > This is springboot application runs in kubernetes as stateful pod. > > > > {code:java} > Exception in thread > "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown > node at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162) > at myclass1.java:28) at myclass2.java:48) at > java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at > java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602) > at > java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) > at > java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637) > at myclass3.java:48) at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) > {code} > > stream-thread > [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State > transition from PENDING_SHUTDOWN to DEAD > > > Transformer is Prototype bean, the supplier supplys new instance of the > Transformer: > > > {code:java} > @Override public Transformer> get() > { return ctx.getBean(MyTransformer.class); }{code} > > > The only way to recover is to delete all topics used by kafkastreams, even if > application restarted same exception is thrown. > *If messages in internal topics of 'store-changelog' are deleted/offset > manipulated, can it cause the issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended
[ https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15571. Resolution: Fixed > StateRestoreListener#onRestoreSuspended is never called because wrapper > DelegatingStateRestoreListener doesn't implement onRestoreSuspended > --- > > Key: KAFKA-15571 > URL: https://issues.apache.org/jira/browse/KAFKA-15571 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > > With https://issues.apache.org/jira/browse/KAFKA-10575 > `StateRestoreListener#onRestoreSuspended` was added. But local tests show > that it is never called because `DelegatingStateRestoreListener` was not > updated to call a new method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores
A. Sophie Blee-Goldman created KAFKA-15215: -- Summary: The default.dsl.store config is not compatible with custom state stores Key: KAFKA-15215 URL: https://issues.apache.org/jira/browse/KAFKA-15215 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman Assignee: Almog Gavra Sort of a bug, sort of a new/missing feature. When we added the long-awaited default.dsl.store config, it was decided to scope the initial KIP to just the two out-of-the-box state stores types offered by Streams, rocksdb and in-memory. The reason being that this would address a large number of the relevant use cases, and could always be followed up with another KIP for custom state stores if/when the demand arose. Of course, since rocksdb is the default anyways, the only beneficiaries of this KIP right now are the people who specifically want only in-memory stores – yet custom state stores users are probably by far the ones with the greatest need for an easier way to configure the store type across an entire application. And unfortunately, because the config currently relies on enum definitions for the known OOTB store types, there's not really any way to extend this feature as it is to work with custom implementations. I think this is a great feature, which is why I hope to see it extended to the broader user base. Most likely we'll want to introduce a new config for this, though whether it replaces the old default.dsl.store config or complements it will have to be decided during the KIP discussion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15045) Move Streams task assignor to public configs
A. Sophie Blee-Goldman created KAFKA-15045: -- Summary: Move Streams task assignor to public configs Key: KAFKA-15045 URL: https://issues.apache.org/jira/browse/KAFKA-15045 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
A. Sophie Blee-Goldman created KAFKA-14976: -- Summary: Left/outer stream-stream joins create KV stores that aren't customizable Key: KAFKA-14976 URL: https://issues.apache.org/jira/browse/KAFKA-14976 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman It appears that we only give the illusion of full customizability when it comes to the state stores of a windowed join. This arose due to an [optimization|https://github.com/apache/kafka/pull/11252] for the performance of the spurious results fix, and means that these joins now come with one additional, and possibly unexpected, state store: {code:java} final StoreBuilder, LeftOrRightValue>> builder = new ListValueStoreBuilder<>( |--[ persistent ? this--> | Stores.persistentKeyValueStore(storeName) : |--[ Stores.inMemoryKeyValueStore(storeName), timestampedKeyAndJoinSideSerde, leftOrRightValueSerde, Time.SYSTEM ); {code} where persistent is defined above that as {code:java} final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} This means regardless of whether a custom state store implementation was passed in to the join, we will still insert one of our RocksDB or InMemory state stores. Which might be very surprising since the API makes it seem like the underlying stores are fully configurable. I'm adding a warning line for this in PR [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] but we should really make this hidden state store fully configurable like the window stores currently are (which will require a KIP) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks
A. Sophie Blee-Goldman created KAFKA-14650: -- Summary: IQv2 can throw ConcurrentModificationException when accessing Tasks Key: KAFKA-14650 URL: https://issues.apache.org/jira/browse/KAFKA-14650 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.4.0 Reporter: A. Sophie Blee-Goldman >From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, >log=true, supplier=IN_MEMORY_WINDOW, >kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]* java.util.ConcurrentModificationException at java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) at java.base/java.util.HashMap.putMapEntries(HashMap.java:508) at java.base/java.util.HashMap.putAll(HashMap.java:781) at org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361) at org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537) at org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278) at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168) at org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438) at org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13602) Allow to broadcast a result record
[ https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13602. Resolution: Fixed > Allow to broadcast a result record > -- > > Key: KAFKA-13602 > URL: https://issues.apache.org/jira/browse/KAFKA-13602 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Major > Labels: kip, newbie++ > Fix For: 3.4.0 > > > From time to time, users ask how they can send a record to more than one > partition in a sink topic. Currently, this is only possible by replicate the > message N times before the sink and use a custom partitioner to write the N > messages into the N different partitions. > It might be worth to make this easier and add a new feature for it. There are > multiple options: > * extend `to()` / `addSink()` with a "broadcast" option/config > * add `toAllPartitions()` / `addBroadcastSink()` methods > * allow StreamPartitioner to return `-1` for "all partitions" > * extend `StreamPartitioner` to allow returning more than one partition (ie > a list/collection of integers instead of a single int) > The first three options imply that a "full broadcast" is supported only, so > it's less flexible. On the other hand, it's easier to use (especially the > first two options are easy as they do not require to implement a custom > partitioner). > The last option would be most flexible and also allow for a "partial > broadcast" (aka multi-cast) pattern. It might also be possible to combine two > options, or maye even a totally different idea. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
A. Sophie Blee-Goldman created KAFKA-14539: -- Summary: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map Key: KAFKA-14539 URL: https://issues.apache.org/jira/browse/KAFKA-14539 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman We can clean up the StreamsMetadataState class a bit by removing the #onChange invocation that currently occurs within StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` parameter in that callback. Instead of building a fake Cluster object from the map of partition info when we invoke #onChange inside the StreamsPartitionAssignor#onAssignment method, we can just directly pass in the `Map` and replace the usage of `Cluster` everywhere in StreamsMetadataState (I believe the current system is a historical artifact from when we used to require passing in a {{Cluster}} for the default partitioning strategy, which the StreamMetadataState needs to compute the partition for a key. At some point in the past we provided a better way to get the default partition, so we no longer need a {{Cluster}} parameter/field at all) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13602) Allow to broadcast a result record
[ https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-13602: Reverting in 3.4 due to logging-related perf/security issue, and some open semantic questions. Retargeting for 3.5 > Allow to broadcast a result record > -- > > Key: KAFKA-13602 > URL: https://issues.apache.org/jira/browse/KAFKA-13602 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip, newbie++ > Fix For: 3.4.0 > > > From time to time, users ask how they can send a record to more than one > partition in a sink topic. Currently, this is only possible by replicate the > message N times before the sink and use a custom partitioner to write the N > messages into the N different partitions. > It might be worth to make this easier and add a new feature for it. There are > multiple options: > * extend `to()` / `addSink()` with a "broadcast" option/config > * add `toAllPartitions()` / `addBroadcastSink()` methods > * allow StreamPartitioner to return `-1` for "all partitions" > * extend `StreamPartitioner` to allow returning more than one partition (ie > a list/collection of integers instead of a single int) > The first three options imply that a "full broadcast" is supported only, so > it's less flexible. On the other hand, it's easier to use (especially the > first two options are easy as they do not require to implement a custom > partitioner). > The last option would be most flexible and also allow for a "partial > broadcast" (aka multi-cast) pattern. It might also be possible to combine two > options, or maye even a totally different idea. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
[ https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13736. Resolution: Fixed > Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives > --- > > Key: KAFKA-13736 > URL: https://issues.apache.org/jira/browse/KAFKA-13736 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Priority: Blocker > Labels: flakey, flaky-test > > Examples: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests > {code} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483) > at > kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-4852) ByteBufferSerializer not compatible with offsets
[ https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-4852. --- Resolution: Fixed > ByteBufferSerializer not compatible with offsets > > > Key: KAFKA-4852 > URL: https://issues.apache.org/jira/browse/KAFKA-4852 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 > Environment: all >Reporter: Werner Daehn >Assignee: LinShunkang >Priority: Minor > Fix For: 3.4.0 > > > Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the > ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The > ByteBufferSerializer will send from pos=0 and not from pos=3 onwards. > Solution: No rewind() but flip() for reading a ByteBuffer. That's what the > flip is meant for. > Story: > Imagine the incoming data comes from a byte[], e.g. a network stream > containing topicname, partition, key, value, ... and you want to create a new > ProducerRecord for that. As the constructor of ProducerRecord requires > (topic, partition, key, value) you have to copy from above byte[] the key and > value. That means there is a memcopy taking place. Since the payload can be > potentially large, that introduces a lot of overhead. Twice the memory. > A nice solution to this problem is to simply wrap the network byte[] into new > ByteBuffers: > ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength); > ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength); > and then use the ByteBufferSerializer instead of the ByteArraySerializer. > But that does not work as the ByteBufferSerializer does a rewind(), hence > both, key and value, will start at position=0 of the data[]. > public class ByteBufferSerializer implements Serializer { > public byte[] serialize(String topic, ByteBuffer data) { > data.rewind(); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14291. Resolution: Duplicate Marking this as a duplicate of KAFKA-13990 as described above > KRaft: ApiVersionsResponse doesn't have finalizedFeatures and > finalizedFeatureEpoch in KRaft mode > - > > Key: KAFKA-14291 > URL: https://issues.apache.org/jira/browse/KAFKA-14291 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Akhilesh Chaganti >Priority: Critical > > https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53 > ``` > class SimpleApiVersionManager( > val listenerType: ListenerType, > val enabledApis: collection.Set[ApiKeys], > brokerFeatures: Features[SupportedVersionRange] > ) extends ApiVersionManager { > def this(listenerType: ListenerType) = { > this(listenerType, ApiKeys.apisForListener(listenerType).asScala, > BrokerFeatures.defaultSupportedFeatures()) > } > private val apiVersions = > ApiVersionsResponse.collectApis(enabledApis.asJava) > override def apiVersionResponse(requestThrottleMs: Int): > ApiVersionsResponse = { > ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, > apiVersions, brokerFeatures) > } > } > ``` > ApiVersionManager for KRaft doesn't add the finalizedFeatures and > finalizedFeatureEpoch to the ApiVersionsResponse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14460) In-memory store iterators can return results with null values
A. Sophie Blee-Goldman created KAFKA-14460: -- Summary: In-memory store iterators can return results with null values Key: KAFKA-14460 URL: https://issues.apache.org/jira/browse/KAFKA-14460 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Due to the thread-safety model we adopted in our in-memory stores to avoid scaling issues, we synchronize all read/write methods and then during range scans, copy the keyset of all results rather than returning a direct iterator over the underlying map. When users call #next to read out the iterator results, we issue a point lookup on the next key and then simply return a new KeyValue<>(key, get(key)) This lets the range scan return results without blocking access to the store by other threads and without risk of ConcurrentModification, as a writer can modify the real store without affecting the keyset copy of the iterator. This also means that those changes won't be reflected in what the iterator sees or returns, which in itself is fine as we don't guarantee consistency semantics of any kind. However, we _do_ guarantee that range scans "must not return null values" – and this contract may be violated if the StreamThread deletes a record that the iterator was going to return. tl;dr we should check get(key) for null and skip to the next result if necessary in the in-memory store iterators. See for example InMemoryKeyValueIterator (note that we'll probably need to buffer one record in advance before we return true from #hasNext) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14459) Document how to use and close a 'Statistics' in the example RocksDBConfigSetter
A. Sophie Blee-Goldman created KAFKA-14459: -- Summary: Document how to use and close a 'Statistics' in the example RocksDBConfigSetter Key: KAFKA-14459 URL: https://issues.apache.org/jira/browse/KAFKA-14459 Project: Kafka Issue Type: Improvement Components: docs, streams Reporter: A. Sophie Blee-Goldman We fixed a memory leak in KAFKA-14432 where we were sometimes failing to close the `Statistics` object used for rocksdb metrics. Since users can define their own Statistics as well, we should make sure they know that this has to be closed like we do for other `RocksDBObject` classes like the Cache. It might also be useful to provide an example of how to use Statistics and what can be done with it. We currently have two sample RocksDBConfigSetter implementations in the docs, both of which could be updated here: # [rocksdb memory management docs](https://kafka.apache.org/documentation/streams/developer-guide/memory-mgmt.html#rocksdb) -- consider including a Statistics in this example to highlight that it needs to be closed? This one could arguably be skipped, although the formatting of this sample config setter seems to be messed up so this might be a good opportunity to fix that on the side # [rocksdb.config.setter config docs](https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id20): this would be a good place to include an example that actually uses the Statistics for something (assuming there's some reason for users to define their own Statistics in the first place, which I personally do not know). We can potentially link to this example from the metrics docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individu
[ https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14454. Resolution: Fixed > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > passes when run individually but not when is run as part of the IT > -- > > Key: KAFKA-14454 > URL: https://issues.apache.org/jira/browse/KAFKA-14454 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Newly added test > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > as part of KIP-837 passes when run individually but fails when is part of IT > class and hence is marked as Ignored. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14432) RocksDBStore relies on finalizers to not leak memory
[ https://issues.apache.org/jira/browse/KAFKA-14432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14432. Resolution: Fixed > RocksDBStore relies on finalizers to not leak memory > > > Key: KAFKA-14432 > URL: https://issues.apache.org/jira/browse/KAFKA-14432 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 3.4.0 > > > Relying on finalizers in RocksDB has been deprecated for a long time, and > starting with rocksdb 7, finalizers are removed completely (see > [https://github.com/facebook/rocksdb/pull/9523]). > Kafka Streams currently relies on finalizers in parts to not leak memory. > This needs to be resolved before we can upgrade to RocksDB 7. > See [https://github.com/apache/kafka/pull/12809] . > This is a native heap profile after running Kafka Streams without finalizers > for a few hours: > {code:java} > Total: 13547.5 MB > 12936.3 95.5% 95.5% 12936.3 95.5% rocksdb::port::cacheline_aligned_alloc > 438.5 3.2% 98.7% 438.5 3.2% rocksdb::BlockFetcher::ReadBlockContents > 84.0 0.6% 99.3% 84.2 0.6% rocksdb::Arena::AllocateNewBlock > 45.9 0.3% 99.7% 45.9 0.3% prof_backtrace_impl > 8.1 0.1% 99.7% 14.6 0.1% rocksdb::BlockBasedTable::PutDataBlockToCache > 6.4 0.0% 99.8% 12941.4 95.5% Java_org_rocksdb_Statistics_newStatistics___3BJ > 6.1 0.0% 99.8% 6.9 0.1% rocksdb::LRUCacheShard::Insert@2d8b20 > 5.1 0.0% 99.9% 6.5 0.0% rocksdb::VersionSet::ProcessManifestWrites > 3.9 0.0% 99.9% 3.9 0.0% rocksdb::WritableFileWriter::WritableFileWriter > 3.2 0.0% 99.9% 3.2 0.0% std::string::_Rep::_S_create{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14260. Resolution: Fixed > InMemoryKeyValueStore iterator still throws ConcurrentModificationException > --- > > Key: KAFKA-14260 > URL: https://issues.apache.org/jira/browse/KAFKA-14260 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 3.2.3 >Reporter: Avi Cherry >Assignee: Lucia Cerchie >Priority: Major > Fix For: 3.4.0 > > > This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802. > Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a > ConcurrentModificationException because the backing map is not concurrent > safe. I expect that this only happens when the store is retrieved from > {{KafkaStreams.store()}} from outside of the topology since any usage of the > store from inside of the topology should be naturally single-threaded. > To start off, a reminder that this behaviour explicitly violates the > interface contract for {{ReadOnlyKeyValueStore}} which states > {quote}The returned iterator must be safe from > java.util.ConcurrentModificationExceptions > {quote} > It is often complicated to make code to demonstrate concurrency bugs, but > thankfully it is trivial to reason through the source code in > {{InMemoryKeyValueStore.java}} to show why this happens: > * All of the InMemoryKeyValueStore methods that return iterators do so by > passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator > constructor. > * These keySets are all VIEWS of the backing map, not copies. > * The InMemoryKeyValueIterator then makes a private copy of the keySet by > passing the original keySet into the constructor for TreeSet. This copying > was implemented in KAFKA-8802, incorrectly intending it to fix the > concurrency problem. > * TreeSet then iterates over the keySet to make a copy. If the original > backing TreeMap in InMemoryKeyValueStore is changed while this copy is being > created it will fail-fast a ConcurrentModificationException. > This bug should be able to be trivially fixed by replacing the backing > TreeMap with a ConcurrentSkipListMap but here's the rub: > This bug has already been found in KAFKA-7912 and the TreeMap was replaced > with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in > KAFKA-8802 because of the performance regression. I can [see from one of the > PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed] > that it was believed the concurrency problem with the TreeMap implementation > was fixed by copying the keyset when the iterator is created but the problem > remains, plus the fix creates an extra copy of the iterated portion of the > set in memory. > For what it's worth, the performance difference between TreeMap and > ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar > ~2x speed through all operations with any size of data, but at the cost of > what turned out to be an easy-to-encounter bug. > This is all unfortunate since the only time the state stores ever get > accessed concurrently is through the `KafkaStreams.store()` mechanism, but I > would imagine that "correct and slightly slower) is better than "incorrect > and faster". > Too bad BoilerBay's AirConcurrentMap is closed-source and patented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13602) Allow to broadcast a result record
[ https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13602. Resolution: Fixed > Allow to broadcast a result record > -- > > Key: KAFKA-13602 > URL: https://issues.apache.org/jira/browse/KAFKA-13602 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip, newbie++ > Fix For: 3.4.0 > > > From time to time, users ask how they can send a record to more than one > partition in a sink topic. Currently, this is only possible by replicate the > message N times before the sink and use a custom partitioner to write the N > messages into the N different partitions. > It might be worth to make this easier and add a new feature for it. There are > multiple options: > * extend `to()` / `addSink()` with a "broadcast" option/config > * add `toAllPartitions()` / `addBroadcastSink()` methods > * allow StreamPartitioner to return `-1` for "all partitions" > * extend `StreamPartitioner` to allow returning more than one partition (ie > a list/collection of integers instead of a single int) > The first three options imply that a "full broadcast" is supported only, so > it's less flexible. On the other hand, it's easier to use (especially the > first two options are easy as they do not require to implement a custom > partitioner). > The last option would be most flexible and also allow for a "partial > broadcast" (aka multi-cast) pattern. It might also be possible to combine two > options, or maye even a totally different idea. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14415) ThreadCache is getting slower with every additional state store
[ https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14415. Resolution: Fixed > ThreadCache is getting slower with every additional state store > --- > > Key: KAFKA-14415 > URL: https://issues.apache.org/jira/browse/KAFKA-14415 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.4.0 > > > There are a few lines in `ThreadCache` that I think should be optimized. > `sizeBytes` is called at least once, and potentially many times in every > `put` and is linear in the number of caches (= number of state stores, so > typically proportional to number of tasks). That means, with every additional > task, every put gets a little slower.Compare the throughput of TIME_ROCKS on > trunk (green graph): > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/] > This is the throughput of TIME_ROCKS is 20% higher when a constant time > `sizeBytes` implementation is used: > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/] > The same seems to apply for the MEM backend (initial throughput >8000 instead > of 6000), however, I cannot run the same benchmark here because the memory is > filled too quickly. > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14444) Simplify user experience of customizing partitioning strategy in Streams
A. Sophie Blee-Goldman created KAFKA-1: -- Summary: Simplify user experience of customizing partitioning strategy in Streams Key: KAFKA-1 URL: https://issues.apache.org/jira/browse/KAFKA-1 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman The current process of plugging a custom partitioning scheme across a Streams application is fairly intensive and extremely error prone. While defining their topology users must pay close attention to where an operator/node may be connected to or creating a topic that will be produced to, or else print out their topology description and try to locate all sink nodes in this way. If they miss passing in their custom partitioner to one or more such locations in the topology, everything downstream will be affected by the inconsistent/unintended partitioning scheme. It can also be easy for users to miss this process entirely and try to customize the partitioning scheme via the producer config. This does not work, and unfortunately results in a runtime exception that's difficult for users to interpret. Ideally we would provide a similar config for Streams where users could define a default implementation of the StreamPartitioner interface. ...unfortunately, this is not so straightforward. Unlike the case of the Producer config, where there is a clearly defined key and value type, there's no guarantee each sink node requiring the custom partitioner deals with the same key/value type as the others. We could utilize the default.key/value configs for this, and only require users to plug in their partitioner where the key/value types differ from the default, but this would likely limit the usefulness of a default partitioner significantly. We could push this to the user to write a generic implementation class with type checking and handling, but this would be pretty awkward and error prone as well. Either way this will take some thought, which is why the idea was pulled from the proposal in KIP-878 and left for a follow-up KIP -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14406) Double iteration of records in batches to be restored
[ https://issues.apache.org/jira/browse/KAFKA-14406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14406. Resolution: Fixed > Double iteration of records in batches to be restored > - > > Key: KAFKA-14406 > URL: https://issues.apache.org/jira/browse/KAFKA-14406 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > > While restoring a batch of records, {{RocksDBStore}} was iterating the > {{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then > iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch. > Simply adding the key and value directly to the RocksDB batch prevents this > unnecessary second iteration, and the creation of itermediate {{KeyValue}} > objects, improving the performance of state restoration, and reducing > unnecessary object allocation. > (thanks to Nick Telford for finding this) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14406) Double iteration of records in batches to be restored
A. Sophie Blee-Goldman created KAFKA-14406: -- Summary: Double iteration of records in batches to be restored Key: KAFKA-14406 URL: https://issues.apache.org/jira/browse/KAFKA-14406 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.4.0 While restoring a batch of records, {{RocksDBStore}} was iterating the {{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch. Simply adding the key and value directly to the RocksDB batch prevents this unnecessary second iteration, and the creation of itermediate {{KeyValue}} objects, improving the performance of state restoration, and reducing unnecessary object allocation. (thanks to Nick Telford for finding this) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14405) Log a warning when users attempt to set a config controlled by Streams
A. Sophie Blee-Goldman created KAFKA-14405: -- Summary: Log a warning when users attempt to set a config controlled by Streams Key: KAFKA-14405 URL: https://issues.apache.org/jira/browse/KAFKA-14405 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Related to https://issues.apache.org/jira/browse/KAFKA-14404 It's too easy for users to try overriding one of the client configs that Streams hardcodes, and since we just silently ignore it there's no good way for them to tell their config is not being used. Sometimes this may be harmless but in cases like the Producer's partitioner, there could be important application logic that's never being invoked. When processing user configs in StreamsConfig, we should check for all these configs and log a warning when any of them have been set -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14404) Improve dWarn that the ProducerConfig partitioner.class cannot be used in Streams
A. Sophie Blee-Goldman created KAFKA-14404: -- Summary: Improve dWarn that the ProducerConfig partitioner.class cannot be used in Streams Key: KAFKA-14404 URL: https://issues.apache.org/jira/browse/KAFKA-14404 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman There are a handful of client configs that can't be set by Streams users for various reasons, such as the group id, but we seem to have missed a few of them in the documentation [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]: the partitioner assignor (Consumer) and partitioner (Producer). This section of the docs also just needs to be cleaned up in general as there is overlap between the [Default Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values] and [Parameters controlled by Kafka Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26] sections, and the table of contents is messed up presumably due to an issue with the section headers. We should separate these with one section covering (only) configs where Streams sets a different default but this can still be overridden by the user, and the other section covering the configs that Streams hardcodes and users can never override. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14385) Flaky Test QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable
A. Sophie Blee-Goldman created KAFKA-14385: -- Summary: Flaky Test QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable Key: KAFKA-14385 URL: https://issues.apache.org/jira/browse/KAFKA-14385 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Failed twice on the same build (Java 8 & 11) h3. Stacktrace java.lang.AssertionError: KafkaStreams did not transit to RUNNING state within 15000 milli seconds. Expected: but: was at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(StreamsTestUtils.java:134) at org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(StreamsTestUtils.java:121) at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable(QueryableStateIntegrationTest.java:1038) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12836/3/testReport/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldNotMakeStoreAvailableUntilAllStoresAvailable/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14384) Flaky Test SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff
A. Sophie Blee-Goldman created KAFKA-14384: -- Summary: Flaky Test SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff Key: KAFKA-14384 URL: https://issues.apache.org/jira/browse/KAFKA-14384 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman h3. Stacktrace java.lang.AssertionError: Did not receive all 5 records from topic selfjoin-outputSelfJoinUpgradeIntegrationTestshouldUpgradeWithTopologyOptimizationOff within 6 ms Expected: is a value equal to or greater than <5> but: <0> was less than <5> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueWithTimestampRecordsReceived$2(IntegrationTestUtils.java:763) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:382) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(IntegrationTestUtils.java:759) at org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.processKeyValueAndVerifyCount(SelfJoinUpgradeIntegrationTest.java:244) at org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff(SelfJoinUpgradeIntegrationTest.java:155) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12835/4/testReport/org.apache.kafka.streams.integration/SelfJoinUpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeWithTopologyOptimizationOff/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12935. Assignee: Lucas Brutschy Fixed again via https://github.com/apache/kafka/commit/ce5faa222b3f58a74994190e3a6267ac87ee21a8 > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > Attachments: > RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore[true].rtf > > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14382) StreamThreads can miss rebalance events when processing records during a rebalance
A. Sophie Blee-Goldman created KAFKA-14382: -- Summary: StreamThreads can miss rebalance events when processing records during a rebalance Key: KAFKA-14382 URL: https://issues.apache.org/jira/browse/KAFKA-14382 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman One of the main improvements introduced by the cooperative protocol was the ability to continue processing records during a rebalance. In Streams, we take advantage of this by polling with a timeout of 0 when a rebalance is/has been in progress, so it can return immediately and continue on through the main loop to process new records. The main poll loop uses an algorithm based on the max.poll.interval.ms to ensure the StreamThread returns to call #poll in time to stay in the consumer group. Generally speaking, it should exit the processing loop and invoke poll within a few minutes at most based on the poll interval, though typically it will break out much sooner once it's used up all the records from the last poll (based on the max.poll.records config which Streams sets to 1,000 by default). However, if doing heavy processing or setting a higher max.poll.records, the thread may continue processing for more than a few seconds. If it had sent out a JoinGroup request before going on to process and was waiting for its JoinGroup response, then once it does return to invoke #poll it will process this response and send out a SyncGroup – but if the processing took too long, this SyncGroup may immediately fail with the REBALANCE_IN_PROGRESS error. Essentially, while the thread was processing the group leader will itself be processing the JoinGroup subscriptions of all members and generating an assignment, then sending this back in its SyncGroup. This may take only a few seconds or less, and the group coordinator will not yet have noticed (or care) that one of the consumers hasn't sent a SyncGroup – it will just return the assigned partitions in the SyncGroup request of the members who have responded in time, and "complete" the rebalance in their eyes. But if the assignment involved moving any partitions from one consumer to another, then it will need to trigger a followup rebalance right away to finish assigning those partitions which were revoked in the previous rebalance. This is what causes a new rebalance to be kicked off just seconds after the first one began. If the consumer that was stuck processing was among those who needed to revoke partitions, this can lead to repeating rebalances – since it fails the SyncGroup of the 1st rebalance it never receives the assignment for it and never knows to revoke those partitions, meaning it will rejoin for the new rebalance still claiming them among its ownedPartitions. When the assignor generates the same assignment for the 2nd rebalance, it will again see that some partitions need to be revoked and will therefore trigger yet another new rebalance after finishing the 2nd. This can go on for as long as the StreamThreads are struggling to finish the JoinGroup phase in time due to processing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-13891: Reopening – original fix was reverted, we should instead fix this assignor-side by making it smarter about partition ownership across generations. Basically, it should take as the previous owner whichever consumer has the highest generation and claims it among their owned partitions [~showuon] I probably won't be able to get to this within the next few days so if you're interested in picking up this fix go ahead and I'll find time to review – otherwise I will try to get to it in time for the 3.4 release > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Priority: Major > Fix For: 3.3.0, 3.2.4 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14211) Streams log message has partition and offset transposed
[ https://issues.apache.org/jira/browse/KAFKA-14211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14211. Resolution: Fixed Resolving this since it's apparently fixed by PR (see Bruno's comment) – [~cadonna] can you fill out the "Fix Version" for this? > Streams log message has partition and offset transposed > --- > > Key: KAFKA-14211 > URL: https://issues.apache.org/jira/browse/KAFKA-14211 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.1 >Reporter: Matt Allwood >Priority: Minor > > The log warning message for out-of-order KTable update has partition and > offset the wrong way around. > For example: > {noformat} > [...-StreamThread-1] WARN > org.apache.kafka.streams.kstream.internals.KTableSource - Detected > out-of-order KTable update for KTABLE-FK-JOIN-OUTPUT-STATE-STORE-000274, > old timestamp=[1649245600022] new timestamp=[1642429126882]. > topic=[...-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-000269-topic] > partition=[2813] offset=[0].{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14123) Delete with null value not supported in Streams PersistantWindowsStore
[ https://issues.apache.org/jira/browse/KAFKA-14123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14123. Resolution: Not A Problem > Delete with null value not supported in Streams PersistantWindowsStore > -- > > Key: KAFKA-14123 > URL: https://issues.apache.org/jira/browse/KAFKA-14123 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Pawan Sharma >Priority: Major > > Unable to delete an Window entry from Persistant Windows Store by passing > null value in the body. > > Put in this class does not check if the value is null and invoke the remove > method. > [https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java] > > Where as the same feature is working in InMemoryWindowsStore, where the null > values are treated as delete. line no 126. > [https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java] > > This behaviour is little in contrast to all other stores including kv stores, > where a null value is treated as delete and also complys with the behaviour > of compressed Kafka topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14318) KIP-878: Autoscaling for Statically Partitioned Streams
A. Sophie Blee-Goldman created KAFKA-14318: -- Summary: KIP-878: Autoscaling for Statically Partitioned Streams Key: KAFKA-14318 URL: https://issues.apache.org/jira/browse/KAFKA-14318 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman Fix For: 3.4.0 [KIP-878: Autoscaling for Statically Partitioned Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14008) Add docs for new metrics introduced in KIP-846
A. Sophie Blee-Goldman created KAFKA-14008: -- Summary: Add docs for new metrics introduced in KIP-846 Key: KAFKA-14008 URL: https://issues.apache.org/jira/browse/KAFKA-14008 Project: Kafka Issue Type: Task Components: docs, streams Reporter: A. Sophie Blee-Goldman Fix For: 3.3.0 Need to write docs for [KIP-846|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093 ] # Note in upgrade guide # Add the following to the [Streams metric docs|https://kafka.apache.org/documentation/#kafka_streams_monitoring]: ## bytes-consumed-total ## records-consumed-total ## bytes-produced-total ## records-produced-total -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13957) Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores
A. Sophie Blee-Goldman created KAFKA-13957: -- Summary: Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores Key: KAFKA-13957 URL: https://issues.apache.org/jira/browse/KAFKA-13957 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Failed on a local build so I have the full logs (attached) {code:java} java.lang.AssertionError: Unexpected exception thrown while getting the value from store. Expected: is (a string containing "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing "The state store, source-table, may have migrated to another instance" or a string containing "Cannot get state store source-table because the stream thread is STARTING, not RUNNING") but: was "The specified partition 1 for store source-table does not exist." at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.verifyRetrievableException(StoreQueryIntegrationTest.java:539) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:241) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:557) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:183) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:833) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13945) Add task-level metrics to Streams for bytes/records Produced
A. Sophie Blee-Goldman created KAFKA-13945: -- Summary: Add task-level metrics to Streams for bytes/records Produced Key: KAFKA-13945 URL: https://issues.apache.org/jira/browse/KAFKA-13945 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13152. Resolution: Fixed > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > Fix For: 3.3.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams
[ https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-13542: Reopening this since it was reverted pending investigation into a benchmark regression > Utilize the new Consumer#enforceRebalance(reason) API in Streams > > > Key: KAFKA-13542 > URL: https://issues.apache.org/jira/browse/KAFKA-13542 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Hao Li >Priority: Blocker > Fix For: 3.2.0 > > > KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance > API, which will be passed in to a new field of the JoinGroup protocol. We > invoke this API throughout Streams for various reasons, which are very useful > for debugging the cause of rebalancing. Passing in the reason to this new API > would make it possible to figure out why a Streams client triggered a > rebalance from the broker logs, which are often the only logs available when > the client logs cannot be retrieved for whatever reason -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13713) Tech Debt: keep StreamThread and TopologyMetadata's view of the topology in sync
A. Sophie Blee-Goldman created KAFKA-13713: -- Summary: Tech Debt: keep StreamThread and TopologyMetadata's view of the topology in sync Key: KAFKA-13713 URL: https://issues.apache.org/jira/browse/KAFKA-13713 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman Currently when the topology is modified via an add/remove request, we immediately update the TopologyMetadata with the new/removed topology and then register listeners for each request so we can complete it once all threads have ack'ed the corresponding update, ie upgraded to that minimum topology version. For consistency we should consider trying to keep the topology on the minimum common version across all (live/active group member) threads. Once a thread notices a topology update has been queued, it will update its own view and bump it to the latest topology version. We then check if the minimum common topology version has increased and then upgrade the official topology as tracked by the TopologyMetadata if so. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13712) Make topology addition/removal atomic so we can roll back if request fails
A. Sophie Blee-Goldman created KAFKA-13712: -- Summary: Make topology addition/removal atomic so we can roll back if request fails Key: KAFKA-13712 URL: https://issues.apache.org/jira/browse/KAFKA-13712 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13711) Fix bugs with input topic management to support pattern subscription fully
A. Sophie Blee-Goldman created KAFKA-13711: -- Summary: Fix bugs with input topic management to support pattern subscription fully Key: KAFKA-13711 URL: https://issues.apache.org/jira/browse/KAFKA-13711 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman See https://github.com/apache/kafka/pull/11601 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13690) Flaky test EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
A. Sophie Blee-Goldman created KAFKA-13690: -- Summary: Flaky test EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once] Key: KAFKA-13690 URL: https://issues.apache.org/jira/browse/KAFKA-13690 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman The _at_least_once_ version of the "{*}EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown"{*} test is occasionally failing with h3. Error Message java.lang.AssertionError: The committed records do not match what expected Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 10), KeyValue(0, 11), KeyValue(0, 13), KeyValue(0, 16), KeyValue(0, 20), KeyValue(0, 25), KeyValue(0, 31), KeyValue(0, 38)]> Seems we are receiving more than the expected records. ...of course, this is an ALOS flavor of the {*}EOS{*}IntegrationTest, so perhaps we shouldn't be running this variant at all? Not sure if this explains the exact output we receive but it certainly seems suspicious Added at_least_once in [https://github.com/apache/kafka/pull/11283] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13645) Support the TopologyTestDriver with modular topologies
A. Sophie Blee-Goldman created KAFKA-13645: -- Summary: Support the TopologyTestDriver with modular topologies Key: KAFKA-13645 URL: https://issues.apache.org/jira/browse/KAFKA-13645 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman Currently the TTD accepts only a single Topology. Users can technically just use one TTD per Topology, but for a complete simulation of the actual KafkaStreams app we'll need to add support for processing multiple modular topologies with the TopologyTestDriver -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13644) Support global state stores with modular topologies
A. Sophie Blee-Goldman created KAFKA-13644: -- Summary: Support global state stores with modular topologies Key: KAFKA-13644 URL: https://issues.apache.org/jira/browse/KAFKA-13644 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13643) Replace "NamedTopology" with "ModularTopology" in the codebase
A. Sophie Blee-Goldman created KAFKA-13643: -- Summary: Replace "NamedTopology" with "ModularTopology" in the codebase Key: KAFKA-13643 URL: https://issues.apache.org/jira/browse/KAFKA-13643 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13590) Rename InternalTopologyBuilder's #topicGroups method to be more descriptive
A. Sophie Blee-Goldman created KAFKA-13590: -- Summary: Rename InternalTopologyBuilder's #topicGroups method to be more descriptive Key: KAFKA-13590 URL: https://issues.apache.org/jira/browse/KAFKA-13590 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman Pretty much what the title says, it can be difficult to figure out what this method is actually returning based on the method name.. At least javadocs were added in a recent PR, but ideally you nwouldn't need to visit the method's implementation at all to understand its function See [https://github.com/apache/kafka/pull/11600/files#r768947553|https://github.com/apache/kafka/pull/11600/files#r768947553] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13543) Consumer may pass stale cluster metadata to the assignor following a subscription update
A. Sophie Blee-Goldman created KAFKA-13543: -- Summary: Consumer may pass stale cluster metadata to the assignor following a subscription update Key: KAFKA-13543 URL: https://issues.apache.org/jira/browse/KAFKA-13543 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman A consumer only ever tracks metadata corresponding to its subscribed topics, which can cause a race condition during a rebalance immediately after a change to the consumer's subscription. Particularly, when new topics are added to the subscription but a rebalance in kicked off before the consumer's metadata is updated with the new topics, it will pass a stale copy of the cluster metadata in to the ConsumerPartitionAssignor#assign method, which may not include the newly subscribed topics regardless of whether they do or do not exist. Most apps are likely unaffected by this, including any consumer client apps using OOTB assignors, since a new rebalance will be kicked off when the metadata is updated and any partitions from the new topics will be assigned at that time. But in Kafka Streams, we do a check during each rebalance to ensure that any user input topics are created ahead of time. This race condition can result in Streams incorrectly identifying user topics as missing and throwing a MissingSourceTopicException when a new topology subscribed to new topics is added to the application We can work around this for now, but it's unfortunate that we can't distinguish between true missing source topics and a transient lack of these topics in the metadata. There might also be some plain consumer client apps with custom assignors that run into this as well, for more advanced users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams
A. Sophie Blee-Goldman created KAFKA-13542: -- Summary: Utilize the new Consumer#enforceRebalance(reason) API in Streams Key: KAFKA-13542 URL: https://issues.apache.org/jira/browse/KAFKA-13542 Project: Kafka Issue Type: Task Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.2.0 KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance API, which will be passed in to a new field of the JoinGroup protocol. We invoke this API throughout Streams for various reasons, which are very useful for debugging the cause of rebalancing. Passing in the reason to this new API would make it possible to figure out why a Streams client triggered a rebalance from the broker logs, which are often the only logs available when the client logs cannot be retrieved for whatever reason -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13439) Deprecate EAGER rebalancing in Kafka Streams
A. Sophie Blee-Goldman created KAFKA-13439: -- Summary: Deprecate EAGER rebalancing in Kafka Streams Key: KAFKA-13439 URL: https://issues.apache.org/jira/browse/KAFKA-13439 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.1.0 Cooperative rebalancing has been the default since 2.4, but we have always had to keep the logic for eager rebalancing around to allow users a live upgrade path. The current upgrade path involves two rolling bounces, the first one to upgrade the byte code and set the UPGRADE_FROM config to keep Kafka Streams on the old EAGER protocol until everyone has been upgraded, and a second rolling bounce to remove the config and start enabling COOPERATIVE We'd like to finally remove the EAGER protocol and tackle some tech debt its presence has accrued, but we should first give users a warning that we intend to remove this and that it will require a slight change to the upgrade path for any users who want to upgrade from 2.3 or below: going through a "bridge" version between 2.4 - 3.1 in the first rolling bounce, before upgrading to the final version. We should also prepare by logging a warning in 3.1 if we see the UPGRADE_FROM config set, informing them that they will need to make sure to remove it before the EAGER protocol is removed. Then in version 3.2 (or whenever we remove it) we still throw an exception and shut down if a user has set the UPGRADE_FROM flag to a pre-2.4 version. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13381) Wrap all uncaught exceptions as StreamsException with TaskId field
[ https://issues.apache.org/jira/browse/KAFKA-13381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13381. Resolution: Fixed > Wrap all uncaught exceptions as StreamsException with TaskId field > -- > > Key: KAFKA-13381 > URL: https://issues.apache.org/jira/browse/KAFKA-13381 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip > Fix For: 3.1.0 > > > KIP-783: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-783%3A+Add+TaskId+field+to+StreamsException] > > Currently only some exceptions that occur during processing are wrapped as a > StreamsException, which complicates the logic required by a user custom > StreamsUncaughtExceptionHandler. It would be cleaner to ensure that all > exceptions thrown to the user/handler are wrapped (exactly once) as a > StreamsException. > Further, many exceptions can be traced back to a particular task: eg due to a > timeout of that task, or thrown during Task#process, or while > closing/suspending/etc that task. It can be helpful both to debugging as well > as to handling to have that information, so we can add a TaskId field to the > StreamsException class to help users identify the source of an exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12994. Resolution: Fixed > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Andrew patterson >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13381) Wrap all uncaught exceptions as StreamsException with TaskId field
A. Sophie Blee-Goldman created KAFKA-13381: -- Summary: Wrap all uncaught exceptions as StreamsException with TaskId field Key: KAFKA-13381 URL: https://issues.apache.org/jira/browse/KAFKA-13381 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman Currently only some exceptions that occur during processing are wrapped as a StreamsException, which complicates the logic required by a user custom StreamsUncaughtExceptionHandler. It would be cleaner to ensure that all exceptions thrown to the user/handler are wrapped (exactly once) as a StreamsException. Further, many exceptions can be traced back to a particular task: eg due to a timeout of that task, or thrown during Task#process, or while closing/suspending/etc that task. It can be helpful both to debugging as well as to handling to have that information, so we can add a TaskId field to the StreamsException class to help users identify the source of an exception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13336) Migrate StreamsBuilder class to interface with factory method on KafkaStreams
A. Sophie Blee-Goldman created KAFKA-13336: -- Summary: Migrate StreamsBuilder class to interface with factory method on KafkaStreams Key: KAFKA-13336 URL: https://issues.apache.org/jira/browse/KAFKA-13336 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In order to refactor and improve the streams physical plan generation, we'll need to clean up the DSL builder API a bit and in particular enforce the configs be passed in from the beginning, rather than only when calling #build. We can also use this opportunity to improve the disconnect between the builder, the resulting Topology, and the Kafka Streams application that ultimately runs this topology – at the moment these are all completely uncoupled on the surface, so it's easy to think that a StreamsBuilder can be reused to build multiple Topology objects, or that a Topology object could be passed in to different KafkaStreams. However there is internal state that is shared and modified during StreamsBuilder#build and in the KafkaStreams constructor, and they are actually very coupled under the hood meaning there must be a 1:1:1 ratio of builder to topology to KafkaStreams. So we need a new API that # Forces users to pass in the configs (Properties) when constructing the builder # Clarifies the relationship of the builder object to the topology, and to the app itself I think a good API for this might look something like this: # Move the StreamsBuilder class to an internal one (technically we would need to keep it where it is for now until a full deprecation cycle) # Introduce a TopologyBuilder interface to replace the functionality of the current StreamsBuilder class, and have StreamsBuilder implement this. All the current methods on StreamsBuilder will be moved to the TopologyBuilder interfaces # Add a factory method on KafkaStreams for users to get instances of the TopologyBuilder, and have this accept a Properties. For example {code:java} class KafkaStreams { public void newTopologyBuilder(final Properties props) { // convert to StreamsConfig to validate configs & check for application.id final StreamsConfig config = new StreamsConfig(props); return new StreamsBuilder(config); } }{code} This should satisfy both of the requirements, and imo provides a cleaner API anyways. Getting the builder through a factory method on the KafkaStreams object should make it clear that this builder is tied to that particular KafkaStreams instance. And we can enforce that it isn't reused for a different application by parsing the Properties passed in to KafkaStreams#newTopologyBuilder, specifically the application.id -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13333) Optimize condition for triggering rebalance after wiping out corrupted task
A. Sophie Blee-Goldman created KAFKA-1: -- Summary: Optimize condition for triggering rebalance after wiping out corrupted task Key: KAFKA-1 URL: https://issues.apache.org/jira/browse/KAFKA-1 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman Just filing a ticket to list some thoughts I had on optimizing https://issues.apache.org/jira/browse/KAFKA-12486. The idea here is to trigger a rebalance upon detecting corruption of some task. This task may have had a large amount of state that had to be wiped out under eos, so we might be able to avoid a long downtime due to restoration if we can utilize the HA TaskAssignor to temporarily move that active task to another node that has some state for it already (eg had a standby task for it). Right now, we trigger that rebalance under the condition that (a) eos is enabled, and (b) at least one of the corrupted tasks was an active task. This is a pretty safe bet, but it's worth jotting down some potential optimizations of this condition so we can trim down the occurrences of unnecessary rebalances that wouldn't have helped. For example: 1) Don't kick off a rebalance if the corrupted task is in CREATED or RESTORING, and is not within the acceptable.recovery.lag from the end of the changelog. If the task wasn't caught up on this host but assigned to it anyway, that indicates there wasn't any other host with enough state for this task and therefore no one to temporarily take it over 2) Only trigger a rebalance if standbys are configured, and/or parse the standby host info to verify whether this task has a standby copy on another live client. It's still possible to have a copy of this task's state on another host even without standbys, but the odds are greatly reduced. 3) If we want to get really fancy (and I'm not quite sure we do), we could have the assignor report not just the names but also the lag of each standby task on another host, and then trigger the rebalance depending on whether this task has a hot standby within the acceptable.recovery.lag -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12486) Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task
[ https://issues.apache.org/jira/browse/KAFKA-12486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12486. Resolution: Fixed > Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task > > > Key: KAFKA-12486 > URL: https://issues.apache.org/jira/browse/KAFKA-12486 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Fix For: 3.1.0 > > > In KIP-441, we added the HighAvailabilityTaskAssignor to address certain > common scenarios which tend to lead to heavy downtime for tasks, such as > scaling out. The new assignor will always place an active task on a client > which has a "caught-up" copy of that tasks' state, if any exists, while the > intended recipient will instead get a standby task to warm up the state in > the background. This way we keep tasks live as much as possible, and avoid > the long downtime imposed by state restoration on active tasks. > We can actually expand on this to reduce downtime due to restoring state: > specifically, we may throw a TaskCorruptedException on an active task which > leads to wiping out the state stores of that task and restoring from scratch. > There are a few cases where this may be thrown: > # No checkpoint found with EOS > # TimeoutException when processing a StreamTask > # TimeoutException when committing offsets under eos > # RetriableException in RecordCollectorImpl > (There is also the case of OffsetOutOfRangeException, but that is excluded > here since it only applies to standby tasks). > We should consider triggering a rebalance when we hit TaskCorruptedException > on an active task, after we've wiped out the corrupted state stores. This > will allow the assignor to temporarily redirect this task to another client > who can resume work on the task while the original owner works on restoring > the state from scratch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13332) New pattern-matched topic with more partitions than existing matched topics can crash Kafka Streams
A. Sophie Blee-Goldman created KAFKA-13332: -- Summary: New pattern-matched topic with more partitions than existing matched topics can crash Kafka Streams Key: KAFKA-13332 URL: https://issues.apache.org/jira/browse/KAFKA-13332 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman The partition count resolution logic in Streams is used to determine the number of partitions for any repartition topics that don't already exist. This is done by parsing the topology to find the number of partitions of all upstream topics, and taking the max. For Pattern-subscribed subtopologies, this means you need to ensure that at least one topic matching this pattern is created prior to starting up the app. That topic, or topics, will determine the number of partitions for any downstream repartitions. The problem is that repartition topics are created once, the first time the app is started up. After that, during each rebalance Streams will validate all repartition topics including checking for their existence, and verifying they have the correct number of partitions. This check will fail if a new topic is created after the first initialization, which matches the pattern but has more partitions than any of the existing topics. This means that unfortunately, you can't create a new input topic that matches the pattern your app is subscribed to unless it has equal or fewer partitions than the existing matching topics. If you do, you would need to stop all instances and delete the existing repartition topics before creating this new topic -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
A. Sophie Blee-Goldman created KAFKA-13295: -- Summary: Long restoration times for new tasks can lead to transaction timeouts Key: KAFKA-13295 URL: https://issues.apache.org/jira/browse/KAFKA-13295 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.1.0 In some EOS applications with relatively long restoration times we've noticed a series of ProducerFencedExceptions occurring during/immediately after restoration. The broker logs were able to confirm these were due to transactions timing out. In Streams, it turns out we automatically begin a new txn when calling {{send}} (if there isn’t already one in flight). A {{send}} occurs often outside a commit during active processing (eg writing to the changelog), leaving the txn open until the next commit. And if a StreamThread has been actively processing when a rebalance results in a new stateful task without revoking any existing tasks, the thread won’t actually commit this open txn before it goes back into the restoration phase while it builds up state for the new task. So the in-flight transaction is left open during restoration, during which the StreamThread only consumes from the changelog without committing, leaving it vulnerable to timing out when restoration times exceed the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13283) Migrate experimental feature to public API
A. Sophie Blee-Goldman created KAFKA-13283: -- Summary: Migrate experimental feature to public API Key: KAFKA-13283 URL: https://issues.apache.org/jira/browse/KAFKA-13283 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13282) Draft final NamedTopology API and publish a KIP
A. Sophie Blee-Goldman created KAFKA-13282: -- Summary: Draft final NamedTopology API and publish a KIP Key: KAFKA-13282 URL: https://issues.apache.org/jira/browse/KAFKA-13282 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman The pre-KIP experimental phase has left quite a few open questions around the API of this new feature, we need to hash that that out and then write it up into a KIP before introducing this in the public interface -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13281) Support upgrades with dynamic addition/removal of disjoint "named" topologies
A. Sophie Blee-Goldman created KAFKA-13281: -- Summary: Support upgrades with dynamic addition/removal of disjoint "named" topologies Key: KAFKA-13281 URL: https://issues.apache.org/jira/browse/KAFKA-13281 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13128. Resolution: Fixed > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.8.1 >Reporter: A. Sophie Blee-Goldman >Assignee: Walker Carlson >Priority: Blocker > Labels: flaky-test > Fix For: 3.1.0 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12963. Fix Version/s: 3.1.0 Resolution: Fixed > Improve error message for Class cast exception > -- > > Key: KAFKA-12963 > URL: https://issues.apache.org/jira/browse/KAFKA-12963 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Rasmus Helbig Hansen >Assignee: Andrew Lapidas >Priority: Minor > Fix For: 3.1.0 > > > After a topology change and starting the application again, we got this type > of error message: > [g9z-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following > error: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > invoking Processor. Do the Processor's input types match the deserialized > types? Check the Serde setup and change the default Serdes in StreamConfig or > provide correct Serdes via method parameters. Make sure the Processor can > accept the deserialized input of type key: org.acme.SomeKey, and value: > org.acme.SomeValue. > Note that although incorrect Serdes are a common cause of error, the cast > exception might have another cause (in user code, for example). For example, > if a processor wires in a store, but casts the generics incorrectly, a class > cast exception could be raised during processing, but the cause would not be > wrong Serdes. > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be > cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue > are in unnamed module of loader 'app') > at > org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ... 20 more > [g9z-Stream
[jira] [Resolved] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-8734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-8734. --- Fix Version/s: 3.0.0 Resolution: Fixed > Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface > -- > > Key: KAFKA-8734 > URL: https://issues.apache.org/jira/browse/KAFKA-8734 > Project: Kafka > Issue Type: Task > Components: clients >Affects Versions: 3.0.0 >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and > migrated all assignors to the [new public consumer.ConsumerPartitionAssignor > interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]]. > Although internal, we provided an [adapter > |[https://github.com/apache/kafka/pull/7110]]for those who may have > implemented a custom PartitionAssignor to avoid breaking changes. These > should be removed in the next major release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
A. Sophie Blee-Goldman created KAFKA-13217: -- Summary: Reconsider skipping the LeaveGroup on close() or add an overload that does so Key: KAFKA-13217 URL: https://issues.apache.org/jira/browse/KAFKA-13217 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In Kafka Streams, when an instance is shut down via the close() API, we intentionally skip sending a LeaveGroup request. This is because often the shutdown is not due to a scaling down event but instead some transient closure, such as during a rolling bounce. In cases where the instance is expected to start up again shortly after, we originally wanted to avoid that member's tasks from being redistributed across the remaining group members since this would disturb the stable assignment and could cause unnecessary state migration and restoration. We also hoped to limit the disruption to just a single rebalance, rather than forcing the group to rebalance once when the member shuts down and then again when it comes back up. So it's really an optimization for the case in which the shutdown is temporary. That said, many of those optimizations are no longer necessary or at least much less useful given recent features and improvements. For example rebalances are now lightweight so skipping the 2nd rebalance is not as worth optimizing for, and the new assignor will take into account the actual underlying state for each task/partition assignment, rather than just the previous assignment, so the assignment should be considerably more stable across bounces and rolling restarts. Given that, it might be time to reconsider this optimization. Alternatively, we could introduce another form of the close() API that forces the member to leave the group, to be used in event of actual scale down rather than a transient bounce. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-13128: Assignee: (was: A. Sophie Blee-Goldman) Failed again for a different reason – just flaky, seems we need to wait for the thread to fully start up {{java.lang.AssertionError: Unexpected exception thrown while getting the value from store. Expected: is (a string containing "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing "The state store, source-table, may have migrated to another instance") but: was "Cannot get state store source-table because the stream thread is STARTING, not RUNNING"}} > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.8.1 >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0, 2.8.1 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13170) Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown
A. Sophie Blee-Goldman created KAFKA-13170: -- Summary: Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown Key: KAFKA-13170 URL: https://issues.apache.org/jira/browse/KAFKA-13170 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: A. Sophie Blee-Goldman [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown_2/] {code:java} Stacktracejava.lang.AssertionError: unexpected exception type thrown; expected: but was: at org.junit.Assert.assertThrows(Assert.java:1020) at org.junit.Assert.assertThrows(Assert.java:981) at org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526) at org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13169) Flaky Test QueryableStateIntegrationTest.shouldBeAbleToQueryStateWithNonZeroSizedCache
A. Sophie Blee-Goldman created KAFKA-13169: -- Summary: Flaky Test QueryableStateIntegrationTest.shouldBeAbleToQueryStateWithNonZeroSizedCache Key: KAFKA-13169 URL: https://issues.apache.org/jira/browse/KAFKA-13169 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: A. Sophie Blee-Goldman [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown/] {code:java} Stacktrace java.lang.AssertionError: unexpected exception type thrown; expected: but was: at org.junit.Assert.assertThrows(Assert.java:1020) at org.junit.Assert.assertThrows(Assert.java:981) at org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526) at org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10246) AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator
[ https://issues.apache.org/jira/browse/KAFKA-10246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-10246. Resolution: Fixed > AbstractProcessorContext topic() throws NullPointerException when modifying a > state store within the DSL from a punctuator > -- > > Key: KAFKA-10246 > URL: https://issues.apache.org/jira/browse/KAFKA-10246 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 > Environment: linux, windows, java 11 >Reporter: Peter Pringle >Priority: Major > > NullPointerException seen when a KTable statestore is being modified by a > punctuated method which is added to a topology via the DSL processor/ktable > valueTransfomer methods. > It seems valid for AbstractProcessorContext.topic() to return null; however > the check below returns a NullPointerException before a null can be returned. > {quote}if (topic.equals(NONEXIST_TOPIC)) { > {quote} > Made a local fix to reverse the ordering of the check (i.e. avoid the null) > and this appears to fix the issue and sends the change to the state stores > changelog topic. > {quote}if (NONEXIST_TOPIC.equals(topic)) { > {quote} > Stacktrace below > {{2020-07-02 07:29:46,829 > [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR > [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}} > {{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the > following error during processing:}} > {{java.lang.NullPointerException: null}} > \{{ at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}} > \{{ at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}} > \{{ at > org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} > \{{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} > \{{ at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}} > \{{ at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}} > \{{ at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}} >
[jira] [Resolved] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?
[ https://issues.apache.org/jira/browse/KAFKA-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13150. Resolution: Invalid > How is Kafkastream configured to consume data from a specified offset ? > --- > > Key: KAFKA-13150 > URL: https://issues.apache.org/jira/browse/KAFKA-13150 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: wangjh >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13021) Improve Javadocs for API Changes and address followup from KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13021. Resolution: Fixed > Improve Javadocs for API Changes and address followup from KIP-633 > -- > > Key: KAFKA-13021 > URL: https://issues.apache.org/jira/browse/KAFKA-13021 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > Fix For: 3.0.0 > > > There are Javadoc changes from the following PR that needs to be completed > prior to the 3.0 release. This Jira item is to track that work > [https://github.com/apache/kafka/pull/10926] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
A. Sophie Blee-Goldman created KAFKA-13128: -- Summary: Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread Key: KAFKA-13128 URL: https://issues.apache.org/jira/browse/KAFKA-13128 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.1.0 Reporter: A. Sophie Blee-Goldman h3. Stacktrace java.lang.AssertionError: Expected: is not null but: was null at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances
A. Sophie Blee-Goldman created KAFKA-13126: -- Summary: Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances Key: KAFKA-13126 URL: https://issues.apache.org/jira/browse/KAFKA-13126 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman Fix For: 3.1.0 In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the {{joinGroupTimeoutMs}} and results in it being set to the {{request.timeout.ms}} instead, which is much lower. This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high {{max.poll.interval.ms}}. We just need to check for overflow and fix it to {{Integer.MAX_VALUE}} when it occurs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
A. Sophie Blee-Goldman created KAFKA-13121: -- Summary: Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates() Key: KAFKA-13121 URL: https://issues.apache.org/jira/browse/KAFKA-13121 Project: Kafka Issue Type: Bug Components: log Reporter: A. Sophie Blee-Goldman h4. Stack Trace {code:java} org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, topicPartition=new-leader-0} at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112) at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code} https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13096) QueryableStoreProvider is not updated when threads are added/removed/replaced rendering IQ impossible
A. Sophie Blee-Goldman created KAFKA-13096: -- Summary: QueryableStoreProvider is not updated when threads are added/removed/replaced rendering IQ impossible Key: KAFKA-13096 URL: https://issues.apache.org/jira/browse/KAFKA-13096 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Reporter: A. Sophie Blee-Goldman Fix For: 3.0.0, 2.8.1 The QueryableStoreProviders class is used to route queries to the correct state store on the owning StreamThread, making it a critical piece of IQ. It gets instantiated when you create a new KafkaStreams, and is passed in a list of StreamThreadStateStoreProviders which it then copies and stores. Because it only stores a copy it only ever contains a provider for the StreamThreads that were created during the app's startup, and unfortunately is never updated during an add/remove/replace thread event. This means that IQ can’t get a handle on any stores that belong to a thread that wasn’t in the original set. If the app is starting up new threads through the #addStreamThread API or following a REPLACE_THREAD event, none of the data in any of the stores owned by that new thread will be accessible by IQ. If a user is removing threads through #removeStreamThread, or threads die and get replaced, you can fall into an endless loop of {{InvalidStateStoreException}} from doing a lookup into stores that have been closed since the thread was removed/died. If over time all of the original threads are removed or replaced, then IQ won’t work at all. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12896) Group rebalance loop caused by repeated group leader JoinGroups
[ https://issues.apache.org/jira/browse/KAFKA-12896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12896. Resolution: Fixed > Group rebalance loop caused by repeated group leader JoinGroups > --- > > Key: KAFKA-12896 > URL: https://issues.apache.org/jira/browse/KAFKA-12896 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.6.0 >Reporter: Lucas Bradstreet >Assignee: David Jacot >Priority: Blocker > Fix For: 3.0.0 > > > We encountered a strange case of a rebalance loop with the > "cooperative-sticky" assignor. The logs show the following for several hours: > > {{Apr 7, 2021 @ 03:58:36.040 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830137 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.992 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830136 > (__consumer_offsets-7) (reason: Updating metadata for member > mygroup-1-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}} > {{Apr 7, 2021 @ 03:58:35.988 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830136 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.972 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830135 > (__consumer_offsets-7) (reason: Updating metadata for member mygroup during > CompletingRebalance)}} > {{Apr 7, 2021 @ 03:58:35.965 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830135 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.953 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830134 > (__consumer_offsets-7) (reason: Updating metadata for member > mygroup-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}} > {{Apr 7, 2021 @ 03:58:35.941 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830134 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.926 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830133 > (__consumer_offsets-7) (reason: Updating metadata for member mygroup during > CompletingRebalance)}} > Every single time, it was the same member that triggered the JoinGroup and it > was always the leader of the group.{{}} > The leader has the privilege of being able to trigger a rebalance by sending > `JoinGroup` even if its subscription metadata has not changed. But why would > it do so? > It is possible that this is due to the same issue or a similar bug to > https://issues.apache.org/jira/browse/KAFKA-12890. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8
A. Sophie Blee-Goldman created KAFKA-13081: -- Summary: Port sticky assignor fixes (KAFKA-12984) back to 2.8 Key: KAFKA-13081 URL: https://issues.apache.org/jira/browse/KAFKA-13081 Project: Kafka Issue Type: Bug Reporter: A. Sophie Blee-Goldman Fix For: 2.8.1 We should make sure that fix #1 and #2 of [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 sticky assignor, since it's pretty much impossible to smoothly cherrypick that commit from 3.0 to 2.8 due to all the recent improvements and refactoring in the AbstractStickyAssignor. Either we can just extract and apply those two fixes to 2.8 directly, or go back and port all the commits that made this cherrypick difficult over to 2.8 as well. If we do so then cherrypicking the original commit should be easy -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13075) Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-13075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13075. Fix Version/s: 3.1.0 Resolution: Fixed > Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest > - > > Key: KAFKA-13075 > URL: https://issues.apache.org/jira/browse/KAFKA-13075 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Chun-Hao Tang >Priority: Major > Labels: newbie, newbie++ > Fix For: 3.1.0 > > > Looks like we have two different test classes covering pretty much the same > thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original > test class for RocksDBStore, but someone later added RocksDBStoreTest, most > likely because they didn't notice the RocksDBKeyValueStoreTest which didn't > follow the usual naming scheme for test classes. > We should consolidate these two into a single file, ideally retaining the > RocksDBStoreTest name since that conforms to the test naming pattern used > throughout Streams (and so this same thing doesn't happen again). It should > also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest > currently does so we continue to get the benefit of all the tests in there as > well -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13075) Consolidate RocksDBStore and RocksDBKeyValueStoreTest
A. Sophie Blee-Goldman created KAFKA-13075: -- Summary: Consolidate RocksDBStore and RocksDBKeyValueStoreTest Key: KAFKA-13075 URL: https://issues.apache.org/jira/browse/KAFKA-13075 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman Looks like we have two different test classes covering pretty much the same thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original test class for RocksDBStore, but someone later added RocksDBStoreTest, most likely because they didn't notice the RocksDBKeyValueStoreTest which didn't follow the usual naming scheme for test classes. We should consolidate these two into a single file, ideally retaining the RocksDBStoreTest name since that conforms to the test naming pattern used throughout Streams (and so this same thing doesn't happen again). It should also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest currently does so we continue to get the benefit of all the tests in there as well -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12993) Formatting of Streams 'Memory Management' docs is messed up
A. Sophie Blee-Goldman created KAFKA-12993: -- Summary: Formatting of Streams 'Memory Management' docs is messed up Key: KAFKA-12993 URL: https://issues.apache.org/jira/browse/KAFKA-12993 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.0.0, 2.8.1 The formatting of this page is all messed up, starting in the RocksDB section. It looks like there's a missing closing tag after the example BoundedMemoryRocksDBConfig class -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
A. Sophie Blee-Goldman created KAFKA-12984: -- Summary: Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata Key: KAFKA-12984 URL: https://issues.apache.org/jira/browse/KAFKA-12984 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman Fix For: 3.0.0, 2.8.1 Some users have reported seeing their consumer group become stuck in the CompletingRebalance phase when using the cooperative-sticky assignor. Based on the request metadata we were able to deduce that multiple consumers were reporting the same partition(s) in their "ownedPartitions" field of the consumer protocol. Since this is an invalid state, the input causes the cooperative-sticky assignor to detect that something is wrong and throw an IllegalStateException. If the consumer application is set up to simply retry, this will cause the group to appear to hang in the rebalance state. The "ownedPartitions" field is encoded based on the ConsumerCoordinator's SubscriptionState, which was assumed to always be up to date. However there may be cases where the consumer has dropped out of the group but fails to clear the SubscriptionState, allowing it to report some partitions as owned that have since been reassigned to another member. We should (a) fix the sticky assignment algorithm to resolve cases of improper input conditions by invalidating the "ownedPartitions" in cases of double ownership, and (b) shore up the ConsumerCoordinator logic to better handle rejoining the group and keeping its internal state consistent. See KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12983) onJoinPrepare is not always invoked before joining the group
A. Sophie Blee-Goldman created KAFKA-12983: -- Summary: onJoinPrepare is not always invoked before joining the group Key: KAFKA-12983 URL: https://issues.apache.org/jira/browse/KAFKA-12983 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman Fix For: 3.0.0, 2.8.1 As the title suggests, the #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the `needsJoinPrepare` flag inside the resetStateAndRejoin() method -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12936) In-memory stores are always restored from scratch after dropping out of the group
A. Sophie Blee-Goldman created KAFKA-12936: -- Summary: In-memory stores are always restored from scratch after dropping out of the group Key: KAFKA-12936 URL: https://issues.apache.org/jira/browse/KAFKA-12936 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Whenever an in-memory store is closed, the actual store contents are garbage collected and the state will need to be restored from scratch if the task is reassigned and re-initialized. We introduced the recycling feature to prevent this from occurring when a task is transitioned from standby to active (or vice versa), but it's still possible for the in-memory state to be unnecessarily wiped out in the case the member has dropped out of the group. In this case, the onPartitionsLost callback is invoked, which will close all active tasks as dirty before the member rejoins the group. This means that all these tasks will need to be restored from scratch if they are reassigned back to this consumer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`
[ https://issues.apache.org/jira/browse/KAFKA-12920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12920. Resolution: Not A Bug > Consumer's cooperative sticky assignor need to clear generation / assignment > data upon `onPartitionsLost` > - > > Key: KAFKA-12920 > URL: https://issues.apache.org/jira/browse/KAFKA-12920 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Priority: Major > Labels: bug, consumer > > Consumer's cooperative-sticky assignor does not track the owned partitions > inside the assignor --- i.e. when it reset its state in event of > ``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the > assignor would not be cleared. This would cause a member to join with empty > generation on the protocol while with non-empty user-data encoding the old > assignment still (and hence pass the validation check on broker side during > JoinGroup), and eventually cause a single partition to be assigned to > multiple consumers within a generation. > We should let the assignor to also clear its assignment/generation when > ``onPartitionsLost`` is triggered in order to avoid this scenario. > Note that 1) for the regular sticky assignor the generation would still have > an older value, and this would cause the previously owned partitions to be > discarded during the assignment, and 2) for Streams' sticky assignor, it’s > encoding would indeed be cleared along with ``onPartitionsLost``. Hence only > Consumer's cooperative-sticky assignor have this issue to solve. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
A. Sophie Blee-Goldman created KAFKA-12851: -- Summary: Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable Key: KAFKA-12851 URL: https://issues.apache.org/jira/browse/KAFKA-12851 Project: Kafka Issue Type: Bug Components: core Reporter: A. Sophie Blee-Goldman Fix For: 3.0.0 Failed twice on a [PR build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/] h3. Stacktrace org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation
A. Sophie Blee-Goldman created KAFKA-12849: -- Summary: Consider migrating TaskMetadata to interface with internal implementation Key: KAFKA-12849 URL: https://issues.apache.org/jira/browse/KAFKA-12849 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In KIP-740 we had to go through a deprecation cycle in order to change the constructor from the original one which accepted the taskId parameter as a string, to the new one which takes a TaskId object directly. We had considered just changing the signature directly without deprecation as this was never intended to be instantiated by users, rather it just acts as a pass-through metadata class. Sort of by definition if there is no reason to ever instantiate it, this seems to indicate it may be better suited as a public interface with the implementation and constructor as internal APIs. -- This message was sent by Atlassian Jira (v8.3.4#803005)