[GitHub] [kafka] ableegoldman opened a new pull request #11601: KAFKA-12648: Minor fixes for input topic management
ableegoldman opened a new pull request #11601: URL: https://github.com/apache/kafka/pull/11601 While working on [#11600](https://github.com/apache/kafka/pull/11600) I noticed a few issues with how we manage topics in the TopologyMetadata, particularly surrounding the code to update and track input topics. This PR cleans that up and adds some further verification when processing possible updates from the subscription metadata or assignment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers
[ https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458927#comment-17458927 ] Kashish Bansal commented on KAFKA-13077: [~junrao] We faced the same issue while restarting kafka also where topic partitions were not replicating across brokers and there was a constant URP for internal and external topics. One more thing, sometimes during zk restart, the data in zk goes away and we are forced to restart kafka and after that the same condition occurs where partitions won't replicate across brokers. Please help for the same. > Replication failing after unclean shutdown of ZK and all brokers > > > Key: KAFKA-13077 > URL: https://issues.apache.org/jira/browse/KAFKA-13077 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Christopher Auston >Priority: Minor > > I am submitting this in the spirit of what can go wrong when an operator > violates the constraints Kafka depends on. I don't know if Kafka could or > should handle this more gracefully. I decided to file this issue because it > was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). > By "easy" I mean that I did not go out of my way to corrupt anything, I just > was not careful when restarting ZK and brokers. > I violated the constraints of keeping Zookeeper stable and at least one > running in-sync replica. > I am running the bitnami/kafka helm chart on Amazon EKS. > {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image' > "docker.io/bitnami/kafka:2.8.0-debian-10-r43" > {quote} > I started with 3 ZK instances and 3 brokers (both STS). I changed the > cpu/memory requests on both STS and kubernetes proceeded to restart ZK and > kafka instances at the same time. If I recall correctly there were some > crashes and several restarts but eventually all the instances were running > again. It's possible all ZK nodes and all brokers were unavailable at various > points. > The problem I noticed was that two of the brokers were just continually > spitting out messages like: > {quote}% kubectl logs kaf-kafka-0 --tail 10 > [2021-07-13 14:26:08,871] INFO [ProducerStateManager > partition=__transaction_state-0] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, > dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from > (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* > (kafka.log.Log) > [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log) > [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Loading producer state till offset 2 with message > format version 2 (kafka.log.Log) > [2021-07-13 14:26:08,877] INFO [ProducerStateManager > partition=__transaction_state-10] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Non-monotonic update of high watermark from > (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) > (kafka.log.Log) > [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log) > [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Loading producer state till offset 1 with message > format version 2 (kafka.log.Log) > [2021-07-13 14:26:08,882] INFO [ProducerStateManager > partition=__transaction_state-20] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Non-monotonic update of high watermark from > (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) > (kafka.log.Log) > {quote} > If I describe that topic I can see that several partitions have a leader of 2 > and the ISR is just 2 (NOTE I added two more brokers and tried to reassign > the topic onto brokers 2,3,4 which you can see below). The new brokers also > spit out the messages about "non-monotonic update" just like the original > followers. This describe output is from the following day. > {{% kafka-topics.sh ${=BS} -topic __transaction_state -describe}} > {{Topic: __transaction_state TopicId: i7bBNCeuQMWl-ZMpzrnMAw
[GitHub] [kafka] ableegoldman opened a new pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies
ableegoldman opened a new pull request #11600: URL: https://github.com/apache/kafka/pull/11600 Another source of flakiness we found in the NamedTopologyIntegrationTest was an ocasional MissingSourceTopicException that was causing the application to shut down. We created all source topics ahead of time in the tests, leading us to discover this [race condition](https://issues.apache.org/jira/browse/KAFKA-13543) in the consumer client which can lead to spurious MissingSourceTopicExceptions when the metadata hasn't finished updating after a change in the consumer's subscription. In addition to finding a workaround for this bug, throwing this MissingSourceTopicException and shutting down the entire app is itself a bug in the NamedTopology feature -- we should not stop all clients and prevent any further processing of the completely valid topologies just because one (or more) topologies were added that are missing their source topics. We can just remove those topologies from the assignment for the time being, and wait until the metadata has finished updating or the user has created the input topics to start assigning tasks from them. So, this PR does two things: a) Avoid throwing a MissingSourceTopicException inside the #assign method when named topologies are used, and just remove those topologies which are missing any of their input topics from the assignment. b) Trigger the uncaught exception handler with a MissingSourceTopicException for each of the topologies that are missing topics, but don't shut down the thread -- we just want to make sure this issue is made visible to the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13543) Consumer may pass stale cluster metadata to the assignor following a subscription update
[ https://issues.apache.org/jira/browse/KAFKA-13543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458923#comment-17458923 ] A. Sophie Blee-Goldman commented on KAFKA-13543: One possible fix would be to block inside Consumer#subscribe until the metadata has been updated, as we currently just issue a non-blocking call to update the metadata and then return. Of course there could be some unintended repercussions to making this previously async API suddenly a blocking call, so we should definitely discuss that further. If waiting on the metadata update is off the table another option is to just mitigate the issue by at least giving the assignor a way to tell that the metadata has not been updated since the subscription was, and therefore that the cluster metadata passed in to #assign can not necessarily be trusted. > Consumer may pass stale cluster metadata to the assignor following a > subscription update > > > Key: KAFKA-13543 > URL: https://issues.apache.org/jira/browse/KAFKA-13543 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > > A consumer only ever tracks metadata corresponding to its subscribed topics, > which can cause a race condition during a rebalance immediately after a > change to the consumer's subscription. Particularly, when new topics are > added to the subscription but a rebalance in kicked off before the consumer's > metadata is updated with the new topics, it will pass a stale copy of the > cluster metadata in to the ConsumerPartitionAssignor#assign method, which may > not include the newly subscribed topics regardless of whether they do or do > not exist. > Most apps are likely unaffected by this, including any consumer client apps > using OOTB assignors, since a new rebalance will be kicked off when the > metadata is updated and any partitions from the new topics will be assigned > at that time. But in Kafka Streams, we do a check during each rebalance to > ensure that any user input topics are created ahead of time. This race > condition can result in Streams incorrectly identifying user topics as > missing and throwing a MissingSourceTopicException when a new topology > subscribed to new topics is added to the application > We can work around this for now, but it's unfortunate that we can't > distinguish between true missing source topics and a transient lack of these > topics in the metadata. There might also be some plain consumer client apps > with custom assignors that run into this as well, for more advanced users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13543) Consumer may pass stale cluster metadata to the assignor following a subscription update
A. Sophie Blee-Goldman created KAFKA-13543: -- Summary: Consumer may pass stale cluster metadata to the assignor following a subscription update Key: KAFKA-13543 URL: https://issues.apache.org/jira/browse/KAFKA-13543 Project: Kafka Issue Type: Bug Components: consumer Reporter: A. Sophie Blee-Goldman A consumer only ever tracks metadata corresponding to its subscribed topics, which can cause a race condition during a rebalance immediately after a change to the consumer's subscription. Particularly, when new topics are added to the subscription but a rebalance in kicked off before the consumer's metadata is updated with the new topics, it will pass a stale copy of the cluster metadata in to the ConsumerPartitionAssignor#assign method, which may not include the newly subscribed topics regardless of whether they do or do not exist. Most apps are likely unaffected by this, including any consumer client apps using OOTB assignors, since a new rebalance will be kicked off when the metadata is updated and any partitions from the new topics will be assigned at that time. But in Kafka Streams, we do a check during each rebalance to ensure that any user input topics are created ahead of time. This race condition can result in Streams incorrectly identifying user topics as missing and throwing a MissingSourceTopicException when a new topology subscribed to new topics is added to the application We can work around this for now, but it's unfortunate that we can't distinguish between true missing source topics and a transient lack of these topics in the metadata. There might also be some plain consumer client apps with custom assignors that run into this as well, for more advanced users. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13281) Support live upgrades with dynamic addition/removal of modular topologies
[ https://issues.apache.org/jira/browse/KAFKA-13281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458847#comment-17458847 ] Luke Chen commented on KAFKA-13281: --- Thanks! [~ableegoldman] ! :) > Support live upgrades with dynamic addition/removal of modular topologies > - > > Key: KAFKA-13281 > URL: https://issues.apache.org/jira/browse/KAFKA-13281 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams
A. Sophie Blee-Goldman created KAFKA-13542: -- Summary: Utilize the new Consumer#enforceRebalance(reason) API in Streams Key: KAFKA-13542 URL: https://issues.apache.org/jira/browse/KAFKA-13542 Project: Kafka Issue Type: Task Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 3.2.0 KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance API, which will be passed in to a new field of the JoinGroup protocol. We invoke this API throughout Streams for various reasons, which are very useful for debugging the cause of rebalancing. Passing in the reason to this new API would make it possible to figure out why a Streams client triggered a rebalance from the broker logs, which are often the only logs available when the client logs cannot be retrieved for whatever reason -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13281) Support live upgrades with dynamic addition/removal of modular topologies
[ https://issues.apache.org/jira/browse/KAFKA-13281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13281: --- Summary: Support live upgrades with dynamic addition/removal of modular topologies (was: Support upgrades with dynamic addition/removal of disjoint "named" topologies) > Support live upgrades with dynamic addition/removal of modular topologies > - > > Key: KAFKA-13281 > URL: https://issues.apache.org/jira/browse/KAFKA-13281 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13282) Draft final NamedTopology API and publish a KIP
[ https://issues.apache.org/jira/browse/KAFKA-13282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13282: --- Description: The pre-KIP experimental phase has left quite a few open questions around the API of this new feature, we need to hash that that out and then write it up into a KIP before introducing this in the public interface [KIP-809 |https://cwiki.apache.org/confluence/x/7ovkCw] was:The pre-KIP experimental phase has left quite a few open questions around the API of this new feature, we need to hash that that out and then write it up into a KIP before introducing this in the public interface > Draft final NamedTopology API and publish a KIP > --- > > Key: KAFKA-13282 > URL: https://issues.apache.org/jira/browse/KAFKA-13282 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > The pre-KIP experimental phase has left quite a few open questions around the > API of this new feature, we need to hash that that out and then write it up > into a KIP before introducing this in the public interface > [KIP-809 |https://cwiki.apache.org/confluence/x/7ovkCw] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13281) Support upgrades with dynamic addition/removal of disjoint "named" topologies
[ https://issues.apache.org/jira/browse/KAFKA-13281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458831#comment-17458831 ] A. Sophie Blee-Goldman commented on KAFKA-13281: Sorry, I'm super behind on my email inbox – thanks for responding Walker. Yeah we don't have a target date for the overall NamedTopology feature itself just yet (I only just reserved a KIP for it today) but the TopologyConfig definitely has a use outside of this work so I'm happy to start pulling it into the public API > Support upgrades with dynamic addition/removal of disjoint "named" topologies > - > > Key: KAFKA-13281 > URL: https://issues.apache.org/jira/browse/KAFKA-13281 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on pull request #11598: feat: Implement range and scan queries
vvcephei commented on pull request #11598: URL: https://github.com/apache/kafka/pull/11598#issuecomment-992981391 Oh, one other thing I just noticed, @vpapavas , can you add JavaDoc explaining what the RangeQuery is for and what each one of its methods does? That's a public API, so it should be well documented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11598: feat: Implement range and scan queries
vvcephei commented on a change in pull request #11598: URL: https://github.com/apache/kafka/pull/11598#discussion_r768122088 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ## @@ -175,6 +194,7 @@ public synchronized void putAll(final List> entries) { if (from == null && to == null) { return getKeyValueIterator(map.keySet(), forward); } else if (from == null) { +System.out.println("---> range upper bound"); Review comment: looks like this was left over. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,85 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runRangeQuery( +final Query query, final PositionBound positionBound, final boolean collectExecutionInfo) { + +final QueryResult result; +final RangeQuery typedQuery = (RangeQuery) query; +final RangeQuery rawRangeQuery; +if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { +rawRangeQuery = RangeQuery.withRange(keyBytes(typedQuery.getLowerBound().get()), +keyBytes(typedQuery.getUpperBound().get())); +} else if (typedQuery.getLowerBound().isPresent()) { +rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); +} else if (typedQuery.getUpperBound().isPresent()) { +rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); +} else { +rawRangeQuery = RangeQuery.withNoBounds(); +} Review comment: At the risk of being too fancy, what do you think about this instead? ```suggestion rawRangeQuery = RangeQuery.withRange(typedQuery.getLowerBound.map(this::keyBytes), typedQuery.getUpperBound.map(this::keyBytes)); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -279,34 +344,34 @@ public static void before() final RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES); assertThat(recordMetadata.hasOffset(), is(true)); INPUT_POSITION.withComponent( -recordMetadata.topic(), -recordMetadata.partition(), -recordMetadata.offset() +recordMetadata.topic(), +recordMetadata.partition(), +recordMetadata.offset() ); } } assertThat(INPUT_POSITION, equalTo( -Position -.emptyPosition() -.withComponent(INPUT_TOPIC_NAME, 0, 1L) -.withComponent(INPUT_TOPIC_NAME, 1, 0L) +Position +.emptyPosition() +.withComponent(INPUT_TOPIC_NAME, 0, 1L) +.withComponent(INPUT_TOPIC_NAME, 1, 1L) )); } @Before public void beforeTest() { final StoreSupplier supplier = storeToTest.supplier(); final Properties streamsConfig = streamsConfiguration( -cache, -log, -storeToTest.name() +cache, +log, +storeToTest.name() Review comment: I'm sorry to sound picky, but do you mind backing out these formatting changes? I'm only concerned because there's a lot of them. Otherwise, we'll
[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found
[ https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458730#comment-17458730 ] Guozhang Wang commented on KAFKA-13476: --- Thanks [~RBosch81], that clears my questions then. I saw [~mjsax] is already on your PR so I'll leave it to him to review and merge it. > Streams crashes when non Base64 Offset Metadata is found > > > Key: KAFKA-13476 > URL: https://issues.apache.org/jira/browse/KAFKA-13476 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Richard Bosch >Assignee: Richard Bosch >Priority: Minor > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > Kafka Streams applications use the metadata stored with the committed offsets > from previous running instances to extract timestamps. > But when the metadata field contains other data the Base64 decoder will throw > an exception causing the Streams application to fail. > A new Offset commit is then required to stop this failure. > I've included the part of the log when we started a Kafka Streams app after > setting the offsets using a third party tool. This tool adds some tracing > metadata so developers and operators could debug who performed this custom > offset commit. > > {noformat} > 2021-11-16 12:56:36.020 INFO 25 --- [-StreamThread-2] > o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, > groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns > and assigned partitions > at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039) > ~[kafka-streams-2.7.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > ~[kafka-streams-2.7.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837) > ~[kafka-streams-2.7.0.jar:na] > java.lang.IllegalArgumentException: Illegal base64 character 7b > at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728) > ~[kafka-streams-2.7.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818) > ~[kafka-streams-2.7.0.jar:na] > 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] > org.apache.kafka.streams.KafkaStreams: stream-client > [streams-example-app-1] All stream threads have died. The instance will be in > error state and should be closed. > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > ~[kafka-streams-2.7.0.jar:na] > java.lang.IllegalArgumentException: Illegal base64 character 7b > {noformat} > I recommend adding a Try Catch block around the Base64 decode in the > StreamTask.decodeTimestamp method and return the Unknown value when this > occurs. > This is pure for resilience when bad data is encountered. > After the Streams application performs a new offset commit the error should > not occur again, limiting the change of frequently occurring warnings in the > logs > I've already made the changes and added a test for this issue, as I would > like to contribute to Kafka. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on pull request #11582: URL: https://github.com/apache/kafka/pull/11582#issuecomment-992922162 Note to reviewers, upon reflection, and based on the discussions on the other ongoing KIPs to add IQv2 queries, I've decided to drop the "RawKeyQuery" that I had originally proposed. It really just saved us from one extra cast in an execution path that already has a ton of other casts. It was an attempt to be a little elegant, but I don't think it was successful. I'm hoping that once we have several queries in place, we'll be able to golf it a bit and come up with an actually more elegant approach to the internal code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions
[ https://issues.apache.org/jira/browse/KAFKA-13540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artem Livshits resolved KAFKA-13540. Resolution: Duplicate See also https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > UniformStickyPartitioner leads to uneven Kafka partitions > - > > Key: KAFKA-13540 > URL: https://issues.apache.org/jira/browse/KAFKA-13540 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.4.1 >Reporter: nk2242696 >Priority: Major > Attachments: MicrosoftTeams-image (1).png > > > Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . > Using UniformStickyPartitioner expected size of each partition to be roughly > of same size But realised size for some of the partitions is almost double . > !MicrosoftTeams-image (1).png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison opened a new pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse
mimaison opened a new pull request #11599: URL: https://github.com/apache/kafka/pull/11599 Implements KIP-784 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13511) Update TimestampConverter SMT to support unix epoch as millis, micros, and seconds
[ https://issues.apache.org/jira/browse/KAFKA-13511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Chanaud updated KAFKA-13511: --- Labels: connect-transformation needs-kip (was: ) > Update TimestampConverter SMT to support unix epoch as millis, micros, and > seconds > -- > > Key: KAFKA-13511 > URL: https://issues.apache.org/jira/browse/KAFKA-13511 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Julien Chanaud >Assignee: Julien Chanaud >Priority: Minor > Labels: connect-transformation, needs-kip > > Currently, the SMT TimestampConverter can convert Timestamp from either > source String, Long or Date into target String, Long or Date. > The problem is that Long source or target is required to be epoch in > milliseconds. > In many cases, epoch is represented with different precisions. This leads to > several Jira tickets : > * KAFKA-12364 > * KAFKA-10561 > I propose to add a new config to TimestampConverter called "epoch.precision" > which defaults to "millis" so as to not impact existing code, and allows for > more precisions : seconds, millis, micros. > {code:json} > "transforms": "TimestampConverter", > "transforms.TimestampConverter.type": > "org.apache.kafka.connect.transforms.TimestampConverter$Value", > "transforms.TimestampConverter.field": "event_date", > "transforms.TimestampConverter.epoch.precision": "micros", > "transforms.TimestampConverter.target.type": "Timestamp" > {code} > Exactly like "format" field which is used as input when the source in String > and output when the target.type is string, this new field would be used as > input when the field is Long, and as output when the target.type is "unix" -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13511) Update TimestampConverter SMT to support unix epoch as millis, micros, and seconds
[ https://issues.apache.org/jira/browse/KAFKA-13511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Chanaud reassigned KAFKA-13511: -- Assignee: Julien Chanaud > Update TimestampConverter SMT to support unix epoch as millis, micros, and > seconds > -- > > Key: KAFKA-13511 > URL: https://issues.apache.org/jira/browse/KAFKA-13511 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Julien Chanaud >Assignee: Julien Chanaud >Priority: Minor > > Currently, the SMT TimestampConverter can convert Timestamp from either > source String, Long or Date into target String, Long or Date. > The problem is that Long source or target is required to be epoch in > milliseconds. > In many cases, epoch is represented with different precisions. This leads to > several Jira tickets : > * KAFKA-12364 > * KAFKA-10561 > I propose to add a new config to TimestampConverter called "epoch.precision" > which defaults to "millis" so as to not impact existing code, and allows for > more precisions : seconds, millis, micros. > {code:json} > "transforms": "TimestampConverter", > "transforms.TimestampConverter.type": > "org.apache.kafka.connect.transforms.TimestampConverter$Value", > "transforms.TimestampConverter.field": "event_date", > "transforms.TimestampConverter.epoch.precision": "micros", > "transforms.TimestampConverter.target.type": "Timestamp" > {code} > Exactly like "format" field which is used as input when the source in String > and output when the target.type is string, this new field would be used as > input when the field is Long, and as output when the target.type is "unix" -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vpapavas opened a new pull request #11598: feat: Implement range and scan queries
vpapavas opened a new pull request #11598: URL: https://github.com/apache/kafka/pull/11598 Implement the RangeQuery as proposed in KIP-805 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #11593: KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches
jsancio commented on pull request #11593: URL: https://github.com/apache/kafka/pull/11593#issuecomment-992715953 Thanks for the reply @cmccabe . Should we add an integration that shows the expected behavior on the broker side? Can we add a description to the PR so that it is included in the commit message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndyGee commented on a change in pull request #11426: KAFKA-13391: don't fsync directory on Windows OS
AndyGee commented on a change in pull request #11426: URL: https://github.com/apache/kafka/pull/11426#discussion_r767968720 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -948,10 +948,12 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need /** * Flushes dirty directories to guarantee crash consistency. * + * Note: We don't fsync directories on Windows OS because otherwise it'll throw AccessDeniedException (KAFKA-13391) + * * @throws IOException if flushing the directory fails. */ public static void flushDir(Path path) throws IOException { -if (path != null) { +if (path != null && !OperatingSystem.IS_WINDOWS) { try (FileChannel dir = FileChannel.open(path, StandardOpenOption.READ)) { dir.force(true); Review comment: There is a file lock on this file that causes the issue, which might be hiding another issue even on other platforms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
satishd commented on pull request #11390: URL: https://github.com/apache/kafka/pull/11390#issuecomment-992617350 Thanks @junrao for the review. Please find inline replies, addressed most of them with latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -79,6 +88,14 @@ private StreamsMetricsImpl streamsMetrics; private TaskId taskId; +private Map queryHandlers = +mkMap( +mkEntry( +KeyQuery.class, +(query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo) +) +); + Review comment: Just trying to establish some pattern here that can let us dispatch these queries efficiently. This O(1) lookup should be faster than an O(n) if/else check or an O(log n) string switch statement, but we won't know for sure without benchmarking. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -216,9 +269,17 @@ public boolean global() { public abstract StoreSupplier supplier(); +public boolean timestamped() { +return true; // most stores are timestamped +}; + public boolean global() { return false; } + +public boolean keyValue() { +return false; +} Review comment: These help us adjust our expectations in the validations below, so that we can cover all store types in the same test. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -513,6 +590,43 @@ public void shouldHandlePingQuery() { assertThat(result.getPosition(), is(INPUT_POSITION)); } +public void shouldHandleKeyQuery( +final Integer key, +final Function valueExtactor, +final Integer expectedValue) { + +final KeyQuery query = KeyQuery.withKey(key); +final StateQueryRequest request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + +final QueryResult queryResult = +result.getGlobalResult() != null +? result.getGlobalResult() +: result.getOnlyPartitionResult(); +final boolean failure = queryResult.isFailure(); +if (failure) { +throw new AssertionError(queryResult.toString()); +} +assertThat(queryResult.isSuccess(), is(true)); + +assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); +assertThrows(IllegalArgumentException.class, +queryResult::getFailureMessage); + +final V result1 = queryResult.getResult(); +final Integer integer = valueExtactor.apply(result1); +assertThat(integer, is(expectedValue)); Review comment: Here's where we run that function to either get the value out of the ValueAndTimestamp or just give back the value with the identity function. ## File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java ## @@ -52,5 +52,12 @@ * The requested store partition does not exist at all. For example, partition 4 was requested, * but the store in question only has 4 partitions (0 through 3). */ -DOES_NOT_EXIST; +DOES_NOT_EXIST, + +/** + * The store that handled the query got an exception during query execution. The message + * will contain the exception details. Depending on the nature of the exception, the caller + * may be able to retry this instance or may need to try a different instance. + */ +STORE_EXCEPTION; Review comment: I realized in the implementation for RocksDB that we will need to account for runtime exceptions from the stores. I'll update the KIP. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -426,6 +487,22 @@ public void verifyStore() { shouldHandlePingQuery(); shouldCollectExecutionInfo(); shouldCollectExecutionInfoUnderFailure(); + +if (storeToTest.keyValue()) { +if (storeToTest.timestamped()) { +shouldHandleKeyQuery( +2, +(Function, Integer>) ValueAndTimestamp::value, +2 +); +} else { +shouldHandleKeyQuery( +2, +Function.identity(), +2 +); +} +} Review comment: Here's where we use those properties. KeyQueries are only implemented for
[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway
prat0318 commented on pull request #11552: URL: https://github.com/apache/kafka/pull/11552#issuecomment-992560873 @hachikuji @jolshan Bump on the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] svudutala-vmware edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
svudutala-vmware edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-992556542 > > > Will this PR solve [CVE-2021-44228](https://github.com/advisories/GHSA-jfh8-c2jp-5v3q)? > > > > > > @soumiksamanta > > https://github.com/apache/kafka/blob/bd3038383265f7bb850c09fe0a74a48c5c2e6f99/gradle/dependencies.gradle#L78 > > > > should be upgraded to 2.15.0. log4j <= 2.14.0 all have this issue. > > Initially I thought log4j 1.x is not impacted but as per [apache/logging-log4j2#608 (comment)](https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126) it is. > > Thank you for sharing the comment. Isn't that comment for log4j v1 in general. kafka by default does not use JMS appender. Do you think it is impacted under the default configuration. > > Also refer to this post: https://lists.apache.org/thread/lgbtvvmy68p0059yoyn9qxzosdmx4jdv Yeah @unverified-user . My understanding is same too. This should not impact unless there is use of JMS. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] svudutala-vmware commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
svudutala-vmware commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-992556542 > > > Will this PR solve [CVE-2021-44228](https://github.com/advisories/GHSA-jfh8-c2jp-5v3q)? > > > > > > @soumiksamanta > > https://github.com/apache/kafka/blob/bd3038383265f7bb850c09fe0a74a48c5c2e6f99/gradle/dependencies.gradle#L78 > > > > should be upgraded to 2.15.0. log4j <= 2.14.0 all have this issue. > > Initially I thought log4j 1.x is not impacted but as per [apache/logging-log4j2#608 (comment)](https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126) it is. > > Thank you for sharing the comment. Isn't that comment for log4j v1 in general. kafka by default does not use JMS appender. Do you think it is impacted under the default configuration. > > Also refer to this post: https://lists.apache.org/thread/lgbtvvmy68p0059yoyn9qxzosdmx4jdv Yeah @unverified-user . My understanding is same too. This should not impact unless there is use of JMS. I am not expert around Kafka connect using any connectors to use JMS. There may be potential impact I guess. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13535) Workaround for mitigating CVE-2021-44228 Kafka
[ https://issues.apache.org/jira/browse/KAFKA-13535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458384#comment-17458384 ] Akansh Shandilya commented on KAFKA-13535: -- [~showuon] Thanks , will wait as per your suggestion. Regarding JMS Appender , found a log4j manual, but that is applicable for log4j 2.x.x. [https://logging.apache.org/log4j/2.x/manual/appenders.html] > Workaround for mitigating CVE-2021-44228 Kafka > --- > > Key: KAFKA-13535 > URL: https://issues.apache.org/jira/browse/KAFKA-13535 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.1 >Reporter: Akansh Shandilya >Priority: Major > > Kafka v2.8.1 uses log4j v1.x . Please review following information : > > Is Kafka v2.8.1 impacted by CVE-2021-44228? > If yes, is there any workaround/recommendation available for Kafka v2.8.1 to > mitigate CVE-2021-44228 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions
[ https://issues.apache.org/jira/browse/KAFKA-13540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nk2242696 updated KAFKA-13540: -- Reviewer: (was: nk2242696) > UniformStickyPartitioner leads to uneven Kafka partitions > - > > Key: KAFKA-13540 > URL: https://issues.apache.org/jira/browse/KAFKA-13540 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.4.1 >Reporter: nk2242696 >Priority: Major > Attachments: MicrosoftTeams-image (1).png > > > Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . > Using UniformStickyPartitioner expected size of each partition to be roughly > of same size But realised size for some of the partitions is almost double . > !MicrosoftTeams-image (1).png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions
[ https://issues.apache.org/jira/browse/KAFKA-13540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nk2242696 updated KAFKA-13540: -- Reviewer: nk2242696 > UniformStickyPartitioner leads to uneven Kafka partitions > - > > Key: KAFKA-13540 > URL: https://issues.apache.org/jira/browse/KAFKA-13540 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.4.1 >Reporter: nk2242696 >Priority: Major > Attachments: MicrosoftTeams-image (1).png > > > Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . > Using UniformStickyPartitioner expected size of each partition to be roughly > of same size But realised size for some of the partitions is almost double . > !MicrosoftTeams-image (1).png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13541) Make IQv2 query/store interface type safe
Patrick Stuedi created KAFKA-13541: -- Summary: Make IQv2 query/store interface type safe Key: KAFKA-13541 URL: https://issues.apache.org/jira/browse/KAFKA-13541 Project: Kafka Issue Type: Sub-task Reporter: Patrick Stuedi Assignee: Patrick Stuedi Currently the new IQv2 interface allows applications to query state stores using subclasses of the Query type. Unfortunately there is currently no way to check that the template type of the query matches the type of the relevant store the query is executed on. As a consequence stores have to do a set of unsafe casts. This ticket is to explore ways to make the query interface type safe where only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions
nk2242696 created KAFKA-13540: - Summary: UniformStickyPartitioner leads to uneven Kafka partitions Key: KAFKA-13540 URL: https://issues.apache.org/jira/browse/KAFKA-13540 Project: Kafka Issue Type: Bug Components: clients, producer Affects Versions: 2.4.1 Reporter: nk2242696 Attachments: MicrosoftTeams-image (1).png Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . Using UniformStickyPartitioner expected size of each partition to be roughly of same size But realised size for some of the partitions is almost double . !MicrosoftTeams-image (1).png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] satishd commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
satishd commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r767698400 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -715,6 +725,58 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { +fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch, + leaderLogStartOffset => truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)) + } + + /** + * Handle a partition whose offset is moved to tiered storage and return a new fetch offset. + */ + protected def fetchOffsetAndBuildRemoteLogAuxState(topicPartition: TopicPartition, topicId: Option[Uuid], + currentLeaderEpoch: Int, + leaderLogStartOffset: Long): PartitionFetchState = { +fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch, + leaderLocalLogStartOffset => +buildRemoteLogAuxState(topicPartition, currentLeaderEpoch, leaderLocalLogStartOffset, leaderLogStartOffset)) + } + + /** + * Handle the offset moved to tiered storage error. Return false if + * 1) the request succeeded or Review comment: Updated the javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #11597: KAFKA-13539: Improve propagation and processing of SSL handshake failures
rajinisivaram opened a new pull request #11597: URL: https://github.com/apache/kafka/pull/11597 When server fails SSL handshake and closes its connection, we attempt to report this to clients on a best-effort basis. When IOException is detected in the client, we may proceed to close the connection before processing all the data from the server if we have data pending to be sent to the server. Server attempts to send any data that has been already wrapped, but may not wrap again after handshake failure, so error may not be propagated to clients. However, our tests assume that clients always detect handshake failures. This PR attempts to wrap and send all data on the server-side after handshake failure and attempts to process all data on the client-side. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13539) Improve propagation and processing of SSL handshake failures
Rajini Sivaram created KAFKA-13539: -- Summary: Improve propagation and processing of SSL handshake failures Key: KAFKA-13539 URL: https://issues.apache.org/jira/browse/KAFKA-13539 Project: Kafka Issue Type: Bug Components: security Affects Versions: 3.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 3.2.0 {color:#172b4d}When server fails SSL handshake and closes its connection, we attempt to report this to clients on a best-effort basis. However, our tests assume that peer always detects the failure. This may not be the case when there are delays. It will be good to improve reliability of handshake failure reporting. {color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] richard-axual commented on pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams
richard-axual commented on pull request #11535: URL: https://github.com/apache/kafka/pull/11535#issuecomment-992330136 @mjsax Thanks for the heads-up. I was on vacation for a while, but I've replied to your question in the Jira issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partitions
[ https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458258#comment-17458258 ] nk2242696 commented on KAFKA-10888: --- [~showuon] [~hachikuji] I propose a solution based on keeping the track of number of offsets/messages written to each partition. # At partitioner level, Store the number of offsets/messages written to each partition. (sizePerPartitionMap) and total(total offsets for a topic) # Before choosing next batch to write(onNextBatch()) . Use (sizePerPartitionMap) to blacklisting the available partitions which causes skewness(USING a configurable THRESHOLD %) . Choose next partition from list of available whitelisted partitions. # To configure sizePerPartitionMap. Use the callback method of producer.send() to update the sizePerPartitionMap and total . This way, we can skip slower partitions(blacklisted) for few rounds and ensure all partitions are roughly of equal size. > Sticky partition leads to uneven product msg, resulting in abnormal delays > in some partitions > -- > > Key: KAFKA-10888 > URL: https://issues.apache.org/jira/browse/KAFKA-10888 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.4.1 >Reporter: jr >Assignee: Luke Chen >Priority: Major > Attachments: image-2020-12-24-21-05-02-800.png, > image-2020-12-24-21-09-47-692.png, image-2020-12-24-21-10-24-407.png > > > 110 producers ,550 partitions ,550 consumers , 5 nodes Kafka cluster > The producer uses the nullkey+stick partitioner, the total production rate > is about 100w tps > Observed partition delay is abnormal and message distribution is uneven, > which leads to the maximum production and consumption delay of the partition > with more messages > abnormal. > I cannot find reason that stick will make the message distribution uneven > at this production rate. > I can't switch to the round-robin partitioner, which will increase the > delay and cpu cost. Is thathe stick partationer design cause uneven message > distribution, or this is abnormal. How to solve it? > !image-2020-12-24-21-09-47-692.png! > As shown in the picture, the uneven distribution is concentrated on some > partitions and some brokers, there seems to be some rules. > This problem does not only occur in one cluster, but in many high tps > clusters, > The problem is more obvious on the test cluster we built. > !image-2020-12-24-21-10-24-407.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found
[ https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458255#comment-17458255 ] Richard Bosch commented on KAFKA-13476: --- Hello [~mjsax] [~guozhang] , This is triggered by a tool that can be used to set offsets for a consumer group to make sure that applications can start somewhere else than beginning or end of a topic. Tracing data was added to metadata because previous releases of the Kafka Client and Streams did not use the metadata part of the OffsetAndMetadata structure. > Streams crashes when non Base64 Offset Metadata is found > > > Key: KAFKA-13476 > URL: https://issues.apache.org/jira/browse/KAFKA-13476 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Richard Bosch >Assignee: Richard Bosch >Priority: Minor > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > Kafka Streams applications use the metadata stored with the committed offsets > from previous running instances to extract timestamps. > But when the metadata field contains other data the Base64 decoder will throw > an exception causing the Streams application to fail. > A new Offset commit is then required to stop this failure. > I've included the part of the log when we started a Kafka Streams app after > setting the offsets using a third party tool. This tool adds some tracing > metadata so developers and operators could debug who performed this custom > offset commit. > > {noformat} > 2021-11-16 12:56:36.020 INFO 25 --- [-StreamThread-2] > o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, > groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns > and assigned partitions > at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039) > ~[kafka-streams-2.7.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > ~[kafka-streams-2.7.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837) > ~[kafka-streams-2.7.0.jar:na] > java.lang.IllegalArgumentException: Illegal base64 character 7b > at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728) > ~[kafka-streams-2.7.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818) > ~[kafka-streams-2.7.0.jar:na] > 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] > org.apache.kafka.streams.KafkaStreams: stream-client > [streams-example-app-1] All stream threads have died. The instance will be in > error state and should be closed. > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > ~[kafka-streams-2.7.0.jar:na] > java.lang.IllegalArgumentException: Illegal base64 character 7b > {noformat} > I recommend adding a Try Catch block around the Base64 decode in the > StreamTask.decodeTimestamp method and return the Unknown value when this > occurs. > This is pure for resilience when bad data is encountered. > After the Streams application performs a new offset commit the error should > not occur again, limiting the change of frequently occurring warnings in the > logs > I've already made the changes and added a test for this issue, as I would > like to contribute to Kafka. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison commented on pull request #11560: KAFKA-7589: Allow configuring network threads per listener
mimaison commented on pull request #11560: URL: https://github.com/apache/kafka/pull/11560#issuecomment-992306969 @rajinisivaram @tombentley @dajac This PR is now ready for reviews. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458219#comment-17458219 ] Luke Chen commented on KAFKA-9366: -- Please upgrade to Log4j to 2.15.0 or newer for CVE-2021-44228. Thanks. > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dengziming commented on pull request #11261: KAFKA-13228: ApiVersionRequest is not properly handled in KRaft
dengziming commented on pull request #11261: URL: https://github.com/apache/kafka/pull/11261#issuecomment-992240783 Hello @mumrah , are you interested in reviewing this? currently, APiVersionRequest is not properly handled in KRaft server and will have a bad effect on KIP-778. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11261: KAFKA-13228: ApiVersionRequest is not properly handled in KRaft
dengziming commented on pull request #11261: URL: https://github.com/apache/kafka/pull/11261#issuecomment-992240131 Hello @mumrah , are you interested in reviewing this, currently, APiVersionRequest is not properly handled in KRaft server and will have a bad effect on KIP-778. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13535) Workaround for mitigating CVE-2021-44228 Kafka
[ https://issues.apache.org/jira/browse/KAFKA-13535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458213#comment-17458213 ] Luke Chen commented on KAFKA-13535: --- [~akansh] , I think if users don't add jms appender config to log4j configuration file, with the *TopicBindingName* or *TopicConnectionFactoryBindingName* setting to bad names, it's fine. I would suggest Kafka team has an official announcement for this vulnerability. Thanks. > Workaround for mitigating CVE-2021-44228 Kafka > --- > > Key: KAFKA-13535 > URL: https://issues.apache.org/jira/browse/KAFKA-13535 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.1 >Reporter: Akansh Shandilya >Priority: Major > > Kafka v2.8.1 uses log4j v1.x . Please review following information : > > Is Kafka v2.8.1 impacted by CVE-2021-44228? > If yes, is there any workaround/recommendation available for Kafka v2.8.1 to > mitigate CVE-2021-44228 -- This message was sent by Atlassian Jira (v8.20.1#820001)