[GitHub] [kafka] dajac commented on pull request #12308: KAFKA-14009: update rebalance timeout in memory when consumers use st…
dajac commented on PR #12308: URL: https://github.com/apache/kafka/pull/12308#issuecomment-1308349599 @Stephan14 Many tests, related to your change, failed in the last build. Could you please take a look? ``` Build / JDK 11 and Scala 2.13 / staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() – kafka.coordinator.group.GroupCoordinatorTest <1s Build / JDK 11 and Scala 2.13 / staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() – kafka.coordinator.group.GroupCoordinatorTest 4s Build / JDK 8 and Scala 2.12 / staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() – kafka.coordinator.group.GroupCoordinatorTest <1s Build / JDK 8 and Scala 2.12 / staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() – kafka.coordinator.group.GroupCoordinatorTest 3s Build / JDK 17 and Scala 2.13 / staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() – kafka.coordinator.group.GroupCoordinatorTest <1s Build / JDK 17 and Scala 2.13 / staticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() – kafka.coordinator.group.GroupCoordinatorTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14363) Add new `group-coordinator` module
[ https://issues.apache.org/jira/browse/KAFKA-14363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14363. - Resolution: Fixed > Add new `group-coordinator` module > -- > > Key: KAFKA-14363 > URL: https://issues.apache.org/jira/browse/KAFKA-14363 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac merged pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
dajac merged PR #12827: URL: https://github.com/apache/kafka/pull/12827 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12809: [KAFKA-14324] Upgrade RocksDB to 7.1.2
clolov commented on PR #12809: URL: https://github.com/apache/kafka/pull/12809#issuecomment-1308324630 Thank you for the review! All of these are valid suggestions, I will aim to get them done today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12776: KAFKA-14327: Unify KRaft snapshot generation between broker and controller
cmccabe commented on code in PR #12776: URL: https://github.com/apache/kafka/pull/12776#discussion_r1017514589 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties +import kafka.server.ConfigAdminManager.toLoggableProps +import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig} +import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} +import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest} +import org.apache.kafka.image.publisher.MetadataPublisher +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DynamicConfigPublisher( Review Comment: Here's a PR for just JointServer and nothing else : https://github.com/apache/kafka/pull/12837 I would do split 2 and split 3 differently, as "new code" versus "changes to old code." Let me know if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12776: KAFKA-14327: Unify KRaft snapshot generation between broker and controller
cmccabe commented on code in PR #12776: URL: https://github.com/apache/kafka/pull/12776#discussion_r1017514589 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties +import kafka.server.ConfigAdminManager.toLoggableProps +import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig} +import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} +import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest} +import org.apache.kafka.image.publisher.MetadataPublisher +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DynamicConfigPublisher( Review Comment: Here's a PR for just JointServer and nothing else : https://github.com/apache/kafka/pull/12837 I would do #2 and #3 differently, as "new code" versus "changes to old code." Let me know if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #12837: MINOR: extract jointly owned parts of BrokerServer and ControllerServer
cmccabe opened a new pull request, #12837: URL: https://github.com/apache/kafka/pull/12837 Extract jointly owned parts of BrokerServer and ControllerServer into JointServer. Shut down JointServer when the last component using it shuts down. (But make sure to stop the raft manager before closing the ControllerServer's sockets.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on PR #12748: URL: https://github.com/apache/kafka/pull/12748#issuecomment-1308231893 @dajac , I've addressed all comments. Please take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1017442642 ## clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java: ## @@ -78,6 +80,26 @@ public void testDecodeGeneration() { assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); } +@Test +public void testCooperativeStickyAssignorHonorSubscriptionUserdataIfNoGenerationIdInField() { +Map partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, 2); +int higherGenerationId = 2; +int lowerGenerationId = 1; + +assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(partitions(tp1)), new ConsumerGroupMetadata(groupId, higherGenerationId, consumer1, Optional.empty())); +ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); Review Comment: We can use `buildSubscriptionWithGeneration` for consumer2 here, but not for consumer1. Please note, the consumer1 provides different owned partitions and generation id as the one provided in subscription. I'm trying to test which data we should honor in different cases. I've updated the comment to make it clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630754#comment-17630754 ] A. Sophie Blee-Goldman commented on KAFKA-12679: Ah ok, thanks for the update. Regarding the retry.backoff.ms config, that's technically speaking a client config meaning it's intended to give Streams users control over the backoff between requests in the embedded consumer, producer, or admin client(s). This is what the Streams docs for that config mean by "The amount of time in milliseconds, before a request is retried" – it's not a universal config for backing off any and every operation within Streams. There is one case where it's currently used by Streams itself however, vs just passing it through to the client configuration, which is when the assignor is validating and/or setting up internal topics during a rebalance – this is still kind of a client/request config since it controls how long to wait between various admin calls, but my point is it's not entirely out of the question to reuse this config for other things within Streams. Still, I would probably advocate for adding a new config for backing off something like this that's completely unrelated to any client requests. All that is to say, we could probably improve the docs to clarify what this config actually controls, and possibly introduce a new config to address things like the busy loop while initializing/locking a task. I'll take a look at the current code on trunk to see if we've improved things since 2.6 as [~divijvaidya] was experiencing. Honestly it's probably sufficient to just hard code a short sleep rather than bother with a configurable backoff By the way, there is actually ongoing work to move the restoration process to a separate thread, which will presumably include this part where we try to lock the task. cc [~cadonna] – in case this isn't on your radar already, we should try and do this in a better way in the new restoration architecture > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Fix For: 3.4.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630746#comment-17630746 ] BIRINDER TIWANA commented on KAFKA-12679: - Actually the root cause of our issue turned out to be in some custom code that we implemented by inheriting some kafka streams API interfaces. We were running a huge loop there which was not needed. But to clarify the _retry.backoff.ms_ config parameter is still not being used to do a retry in case of LockException here [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L679.] I am not sure if that is documented anywhere that this has to be done in a KIP etc. or not. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Fix For: 3.4.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1017395682 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java: ## @@ -146,43 +165,22 @@ public void deserializeNewSubscriptionWithOldVersion() { assertNull(parsedSubscription.userData()); assertTrue(parsedSubscription.ownedPartitions().isEmpty()); assertFalse(parsedSubscription.groupInstanceId().isPresent()); +assertEquals(DEFAULT_GENERATION, parsedSubscription.generationId()); } -@Test -public void deserializeFutureSubscriptionVersion() { -// verify that a new version which adds a field is still parseable -short version = 100; - -Schema subscriptionSchemaV100 = new Schema( -new Field("topics", new ArrayOf(Type.STRING)), -new Field("user_data", Type.NULLABLE_BYTES), -new Field("owned_partitions", new ArrayOf( -ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)), -new Field("foo", Type.STRING)); - -Struct subscriptionV100 = new Struct(subscriptionSchemaV100); -subscriptionV100.set("topics", new Object[]{"topic"}); -subscriptionV100.set("user_data", ByteBuffer.wrap(new byte[0])); -subscriptionV100.set("owned_partitions", new Object[]{new Struct( -ConsumerProtocolSubscription.TopicPartition.SCHEMA_1) -.set("topic", tp2.topic()) -.set("partitions", new Object[]{tp2.partition()})}); -subscriptionV100.set("foo", "bar"); - -Struct headerV100 = new Struct(new Schema(new Field("version", Type.INT16))); -headerV100.set("version", version); - -ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + headerV100.sizeOf()); -headerV100.writeTo(buffer); -subscriptionV100.writeTo(buffer); - -buffer.flip(); +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void deserializeFutureSubscriptionVersion(boolean hasGenerationId) { +ByteBuffer buffer = generateFutureSubscriptionVersionData(hasGenerationId); Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer); subscription.setGroupInstanceId(groupInstanceId); assertEquals(Collections.singleton("topic"), toSet(subscription.topics())); assertEquals(Collections.singleton(tp2), toSet(subscription.ownedPartitions())); assertEquals(groupInstanceId, subscription.groupInstanceId()); +if (hasGenerationId) { +assertEquals(generationId, subscription.generationId()); +} Review Comment: test updated, no `hasGenerationId` anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1017395458 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java: ## @@ -247,4 +245,42 @@ public void deserializeFutureAssignmentVersion() { Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer); assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions())); } + +private ByteBuffer generateFutureSubscriptionVersionData(boolean hasGenerationId) { +// verify that a new version which adds a field is still parseable +short version = 100; + +Schema subscriptionSchemaV100 = new Schema( +new Field("topics", new ArrayOf(Type.STRING)), +new Field("user_data", Type.NULLABLE_BYTES), +new Field("owned_partitions", new ArrayOf( +ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)), +hasGenerationId ? new Field("generation_id", Type.INT32) : new Field("foo", Type.STRING), Review Comment: I think you're right. The generation_id should always be there. I think this test is just to test we will always get the correct data no matter what new fields added. Updated the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1017394471 ## clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java: ## @@ -78,6 +80,26 @@ public void testDecodeGeneration() { assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); } +@Test Review Comment: I agree we should add tests for the latest mechanism, but I think we should still keep the tests for old schema. These are unit tests, which will not take too long. I added variable to test old and new schemas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630721#comment-17630721 ] A. Sophie Blee-Goldman commented on KAFKA-12679: Huh, I thought we had already fixed this a while ago but if you're still seeing it on 2.8.1 then maybe I'm misremembering. If you're interested in submitting a patch for this it would be welcome – I can backport it to older branches once it's merged, but just a heads up it's unlikely for there to be another bugfix release for 2.8. If you're willing/able to build from source then of course this should be fine. I'm pretty sure not much has changed in the initialization loop logic between now and 2.8 so it should hopefully be a smooth cherrypick from trunk to 2.8, but it might be worth poking around both branches when you are looking at a fix to see if that fix will apply without conflicts to both. Anyways if/when you have a patch ready, feel free to ping me directly on the PR > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Priority: Major > Fix For: 3.4.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630717#comment-17630717 ] A. Sophie Blee-Goldman commented on KAFKA-14362: Hey [~Carlstedt] you're right about why this is happening, because of the restart a rebalance is kicked off which means that any further attempts to commit offsets by other members of the group will fail. After a rebalance, if for example a partition is reassigned from consumer A to consumer B, then consumer B knows where to pick up and resume processing by seeking to the position that corresponds with the latest committed offsets for that partition. If consumer A had processed a message right before the rebalance but then failed to commit an offset for it, this message will be reprocessed by consumer B – essentially to Kafka it looks like this message was never fully processed by A and therefore B should make sure to process it again. In other words, this is the intended behavior – there's no guarantee that a message will only be _consumed_ once, instead Kafka guarantees _at least once_ processing semantics – this is to make sure that every record is fully processed and handled by your application logic before moving on to the next record. Otherwise it wouldn't be fault tolerant, ie you might lose a record completely for example if the app crashed immediately after the record was polled from the consumer > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] >
[jira] [Commented] (KAFKA-14282) RecordCollector throws exception on message processing
[ https://issues.apache.org/jira/browse/KAFKA-14282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630710#comment-17630710 ] A. Sophie Blee-Goldman commented on KAFKA-14282: Here we go: [https://github.com/apache/kafka/pull/12836] > RecordCollector throws exception on message processing > -- > > Key: KAFKA-14282 > URL: https://issues.apache.org/jira/browse/KAFKA-14282 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Sebastian Bruckner >Priority: Major > > Since we upgrade from version 3.2.0 to 3.3.1 we see a lot of exceptions > thrown by the RecordCollector > {code:java} > stream-thread [XXX-StreamThread-1] task [2_8] Unable to records bytes > produced to topic XXX by sink node KSTREAM-SINK-33 as the node is not > recognized. > Known sink nodes are [KSTREAM-SINK-57, > XXX-joined-fk-subscription-registration-sink]. > {code} > Restarting the application did not help. > I think this is related to > [KIP-846|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093] > which was introduced in 3.3.0 with the ticket > https://issues.apache.org/jira/browse/KAFKA-13945 . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14282) RecordCollector throws exception on message processing
[ https://issues.apache.org/jira/browse/KAFKA-14282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-14282: -- Assignee: A. Sophie Blee-Goldman > RecordCollector throws exception on message processing > -- > > Key: KAFKA-14282 > URL: https://issues.apache.org/jira/browse/KAFKA-14282 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Sebastian Bruckner >Assignee: A. Sophie Blee-Goldman >Priority: Major > > Since we upgrade from version 3.2.0 to 3.3.1 we see a lot of exceptions > thrown by the RecordCollector > {code:java} > stream-thread [XXX-StreamThread-1] task [2_8] Unable to records bytes > produced to topic XXX by sink node KSTREAM-SINK-33 as the node is not > recognized. > Known sink nodes are [KSTREAM-SINK-57, > XXX-joined-fk-subscription-registration-sink]. > {code} > Restarting the application did not help. > I think this is related to > [KIP-846|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093] > which was introduced in 3.3.0 with the ticket > https://issues.apache.org/jira/browse/KAFKA-13945 . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14282) RecordCollector throws exception on message processing
[ https://issues.apache.org/jira/browse/KAFKA-14282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14282: --- Fix Version/s: 3.4.0 > RecordCollector throws exception on message processing > -- > > Key: KAFKA-14282 > URL: https://issues.apache.org/jira/browse/KAFKA-14282 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Sebastian Bruckner >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > > Since we upgrade from version 3.2.0 to 3.3.1 we see a lot of exceptions > thrown by the RecordCollector > {code:java} > stream-thread [XXX-StreamThread-1] task [2_8] Unable to records bytes > produced to topic XXX by sink node KSTREAM-SINK-33 as the node is not > recognized. > Known sink nodes are [KSTREAM-SINK-57, > XXX-joined-fk-subscription-registration-sink]. > {code} > Restarting the application did not help. > I think this is related to > [KIP-846|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093] > which was introduced in 3.3.0 with the ticket > https://issues.apache.org/jira/browse/KAFKA-13945 . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman opened a new pull request, #12836: KAFKA-14282: stop tracking Produced sensors by processor node id
ableegoldman opened a new pull request, #12836: URL: https://github.com/apache/kafka/pull/12836 Users have been seeing a large number of these error messages being logged by the RecordCollectorImpl: ``` Unable to records bytes produced to topic XXX by sink node YYY as the node is not recognized. ``` It seems like we try to save all known sink nodes when the record collector is constructed, by we do so by going through the known sink topics which means we could miss some nodes, for example if dynamic topic routing is used. Previously we were logging an error and would skip recording the metric if we tried to `send` a record from a sink node it didn't recognize, but there's not really any reason to have been tracking the sensors by node in the first place -- we can just track the actual sink topics themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14282) RecordCollector throws exception on message processing
[ https://issues.apache.org/jira/browse/KAFKA-14282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630707#comment-17630707 ] A. Sophie Blee-Goldman commented on KAFKA-14282: Oh wait I think I know what's going on here. Super dumb bug, I'll try to put out a fix shortly. Sorry for the inconvenience everyone! > RecordCollector throws exception on message processing > -- > > Key: KAFKA-14282 > URL: https://issues.apache.org/jira/browse/KAFKA-14282 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Sebastian Bruckner >Priority: Major > > Since we upgrade from version 3.2.0 to 3.3.1 we see a lot of exceptions > thrown by the RecordCollector > {code:java} > stream-thread [XXX-StreamThread-1] task [2_8] Unable to records bytes > produced to topic XXX by sink node KSTREAM-SINK-33 as the node is not > recognized. > Known sink nodes are [KSTREAM-SINK-57, > XXX-joined-fk-subscription-registration-sink]. > {code} > Restarting the application did not help. > I think this is related to > [KIP-846|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093] > which was introduced in 3.3.0 with the ticket > https://issues.apache.org/jira/browse/KAFKA-13945 . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14360) Documentation: Streams Security page has broken links
[ https://issues.apache.org/jira/browse/KAFKA-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-14360: -- Assignee: A. Sophie Blee-Goldman > Documentation: Streams Security page has broken links > - > > Key: KAFKA-14360 > URL: https://issues.apache.org/jira/browse/KAFKA-14360 > Project: Kafka > Issue Type: Bug >Reporter: Adam >Assignee: A. Sophie Blee-Goldman >Priority: Major > > A number of links on the 'Streams Security' page are 404-ing > https://kafka.apache.org/documentation/streams/developer-guide/security.html > * Kafka’s security features > https://kafka.apache.org/documentation/documentation.html#security > * Java Producer and Consumer API > https://kafka.apache.org/documentation/clients/index.html#kafka-clients -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14360) Documentation: Streams Security page has broken links
[ https://issues.apache.org/jira/browse/KAFKA-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14360: --- Fix Version/s: 3.4.0 > Documentation: Streams Security page has broken links > - > > Key: KAFKA-14360 > URL: https://issues.apache.org/jira/browse/KAFKA-14360 > Project: Kafka > Issue Type: Bug >Reporter: Adam >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > > A number of links on the 'Streams Security' page are 404-ing > https://kafka.apache.org/documentation/streams/developer-guide/security.html > * Kafka’s security features > https://kafka.apache.org/documentation/documentation.html#security > * Java Producer and Consumer API > https://kafka.apache.org/documentation/clients/index.html#kafka-clients -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14360) Documentation: Streams Security page has broken links
[ https://issues.apache.org/jira/browse/KAFKA-14360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630702#comment-17630702 ] A. Sophie Blee-Goldman commented on KAFKA-14360: Weird, thanks for finding this – I'm guessing the first one is broken due to an extra '/documentation' in the link, it should presumably be directing to [https://kafka.apache.org/documentation/#security] As for the 2nd one, it's a little less clear what it should be pointing to – oddly there does not seem to be a dedicated "clients" page on the actual AK docs site, if you go into the drop-down menu under "Docs" for example and click on "Clients" there, it actually sends you to a wiki page on the non-java clients. I don't think that's what was intended here. Since it mentions APIs my best guess is that it should be linking to this: [https://kafka.apache.org/documentation/#api] > Documentation: Streams Security page has broken links > - > > Key: KAFKA-14360 > URL: https://issues.apache.org/jira/browse/KAFKA-14360 > Project: Kafka > Issue Type: Bug >Reporter: Adam >Priority: Major > > A number of links on the 'Streams Security' page are 404-ing > https://kafka.apache.org/documentation/streams/developer-guide/security.html > * Kafka’s security features > https://kafka.apache.org/documentation/documentation.html#security > * Java Producer and Consumer API > https://kafka.apache.org/documentation/clients/index.html#kafka-clients -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
jeffkbkim commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1017328926 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -135,6 +151,13 @@ class ReplicaFetcherThread(name: String, logAppendInfo } + private def completeDelayedFetchRequests(): Unit = { +if (partitionsWithNewHighWatermark.nonEmpty) { + replicaMgr.completeDelayedFetchRequests(partitionsWithNewHighWatermark.toSeq) Review Comment: i'm concerned that copying the buffer will affect performance. Should we use `toIndexedSeq`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #12835: KAFKA-14294: check whether a transaction is in flight before skipping a commit
ableegoldman opened a new pull request, #12835: URL: https://github.com/apache/kafka/pull/12835 Add a new `#transactionInFlight` API to the StreamsProducer to expose the flag of the same name, then check whether there is an open transaction when we determine whether or not to perform a commit in TaskExecutor. This is to avoid unnecessarily dropping out of the group on transaction timeout in the case a transaction was begun outside of regular processing, eg when a punctuator forwards records but there are no newly consumer records and thus no new offsets to commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-14294: -- Assignee: A. Sophie Blee-Goldman > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.1 >Reporter: Vicky Papavasileiou >Assignee: A. Sophie Blee-Goldman >Priority: Major > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630678#comment-17630678 ] A. Sophie Blee-Goldman commented on KAFKA-14294: I guess we should just remove that check? ie first go through all the tasks and check whether any of them have commitNeeded = true, and if so we still need to commit whether or not there are actually "new" offsets to be committed. I do worry a bit about the potential impact of excessively committing, maybe we should try to expose/check whether the producer has an open transaction OR there are offsets to commit, rather than relying on the commitNeeded flag which would be true regardless of whether the punctuation actually tried to forward new records/open a new transaction > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.1 >Reporter: Vicky Papavasileiou >Priority: Major > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13891: --- Fix Version/s: 3.4.0 (was: 3.3.0) (was: 3.2.4) > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Priority: Major > Fix For: 3.4.0 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-13891: Reopening – original fix was reverted, we should instead fix this assignor-side by making it smarter about partition ownership across generations. Basically, it should take as the previous owner whichever consumer has the highest generation and claims it among their owned partitions [~showuon] I probably won't be able to get to this within the next few days so if you're interested in picking up this fix go ahead and I'll find time to review – otherwise I will try to get to it in time for the 3.4 release > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Priority: Major > Fix For: 3.3.0, 3.2.4 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12809: [KAFKA-14324] Upgrade RocksDB to 7.1.2
ableegoldman commented on code in PR #12809: URL: https://github.com/apache/kafka/pull/12809#discussion_r1017262697 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java: ## @@ -338,14 +338,13 @@ public Statistics statistics() { } @Deprecated -@Override public void setBaseBackgroundCompactions(final int baseBackgroundCompactions) { -dbOptions.setBaseBackgroundCompactions(baseBackgroundCompactions); +// no-op Review Comment: we should still log a warning imo (and maybe point users to the new way of doing this?) Ditto all the others like this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12776: KAFKA-14327: Unify KRaft snapshot generation between broker and controller
hachikuji commented on code in PR #12776: URL: https://github.com/apache/kafka/pull/12776#discussion_r1017244308 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties +import kafka.server.ConfigAdminManager.toLoggableProps +import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig} +import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} +import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest} +import org.apache.kafka.image.publisher.MetadataPublisher +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DynamicConfigPublisher( Review Comment: Can we pull this into a separate PR? It is asking a lot from reviewers to be rigorous with a 4k diff. As I see it, there are at least 3 separate PRs here: 1. General refactor of server startup/shutdown 2. Implementation of time-based snapshots 3. Implementation of dynamic controller configuration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12776: KAFKA-14327: Unify KRaft snapshot generation between broker and controller
hachikuji commented on code in PR #12776: URL: https://github.com/apache/kafka/pull/12776#discussion_r1017244308 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties +import kafka.server.ConfigAdminManager.toLoggableProps +import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig} +import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} +import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest} +import org.apache.kafka.image.publisher.MetadataPublisher +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DynamicConfigPublisher( Review Comment: Can we pull this into a separate PR? It is asking a lot from reviewers to be rigorous with a 4k diff. As I see it, there are at least 3 separate PRs here.: 1. General refactor of server startup/shutdown 2. Implementation of time-based snapshots 3. Implementation of dynamic controller configuration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12776: KAFKA-14327: Unify KRaft snapshot generation between broker and controller
hachikuji commented on code in PR #12776: URL: https://github.com/apache/kafka/pull/12776#discussion_r1017238973 ## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ## @@ -69,95 +64,54 @@ class KafkaRaftServer( private val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) - private val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( -metaProps, + private val jointServer = new JointServer( config, -new MetadataRecordSerde, -KafkaRaftServer.MetadataPartition, -KafkaRaftServer.MetadataTopicId, +metaProps, time, metrics, threadNamePrefix, -controllerQuorumVotersFuture +controllerQuorumVotersFuture, +new StandardFaultHandlerFactory(), ) private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) { -val brokerMetrics = BrokerServerMetrics(metrics) -val fatalFaultHandler = new ProcessExitingFaultHandler() -val metadataLoadingFaultHandler = new LoggingFaultHandler("metadata loading", -() => brokerMetrics.metadataLoadErrorCount.getAndIncrement()) -val metadataApplyingFaultHandler = new LoggingFaultHandler("metadata application", - () => brokerMetrics.metadataApplyErrorCount.getAndIncrement()) Some(new BrokerServer( - config, - metaProps, - raftManager, - time, - metrics, - brokerMetrics, - threadNamePrefix, + jointServer, offlineDirs, - controllerQuorumVotersFuture, - fatalFaultHandler, - metadataLoadingFaultHandler, - metadataApplyingFaultHandler )) } else { None } private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) { -val controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time) -val metadataFaultHandler = new LoggingFaultHandler("controller metadata", - () => controllerMetrics.incrementMetadataErrorCount()) -val fatalFaultHandler = new ProcessExitingFaultHandler() Some(new ControllerServer( - metaProps, - config, - raftManager, - time, - metrics, - controllerMetrics, - threadNamePrefix, - controllerQuorumVotersFuture, + jointServer, KafkaRaftServer.configSchema, - raftManager.apiVersions, bootstrapMetadata, - metadataFaultHandler, - fatalFaultHandler )) } else { None } override def startup(): Unit = { Mx4jLoader.maybeLoad() -// Note that we startup `RaftManager` first so that the controller and broker -// can register listeners during initialization. -raftManager.startup() controller.foreach(_.startup()) broker.foreach(_.startup()) AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info(KafkaBroker.STARTED_MESSAGE) } override def shutdown(): Unit = { +// The last component to be shut down will stop JointServer. broker.foreach(_.shutdown()) -// The order of shutdown for `RaftManager` and `ControllerServer` is backwards Review Comment: It looks like we lost this in the updated implementation? The socket server of the controller is shutdown before `JointServer`. By the time we get to `RaftManager` shutdown, the socket server is down and we cannot do graceful resignation from the quorum anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12833: KAFKA-14370: Properly close ImageWriter objects
jsancio commented on code in PR #12833: URL: https://github.com/apache/kafka/pull/12833#discussion_r1017191691 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -398,8 +398,11 @@ class BrokerMetadataListener( build() try { _image.write(writer, options) - } finally { -writer.close() +writer.close(true) Review Comment: `MetadataImage::write` calls `ImageWriter::close(true)`. Can we remove that call? It if very difficult to read and understand code that closes `AutoCloseable` in different layers of the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #12816: KAFKA-14274: take 2
kirktrue closed pull request #12816: KAFKA-14274: take 2 URL: https://github.com/apache/kafka/pull/12816 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #12831: KAFKA-14365: Refactor Fetcher to allow different implementations
kirktrue closed pull request #12831: KAFKA-14365: Refactor Fetcher to allow different implementations URL: https://github.com/apache/kafka/pull/12831 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #12834: KAFKA-14365: Refactor Fetcher to allow different implementations 2/2
kirktrue opened a new pull request, #12834: URL: https://github.com/apache/kafka/pull/12834 (This is a stacked PR on top of #12832.) Introduce the generalized `Fetcher` interface, update `DefaultFetcher` to implement the `Fetcher` interface, and update relevant call sites to use the interface (over the concrete class) where possible. This is part of the consumer thread refactoring project. See [KAFKA-14365: Refactor Fetcher to allow different implementations](https://issues.apache.org/jira/browse/KAFKA-14365). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #12833: KAFKA-14370: Properly close ImageWriter objects
cmccabe opened a new pull request, #12833: URL: https://github.com/apache/kafka/pull/12833 Properly close ImageWriter objects in BrokerMetadataListener and BrokerMetadataSnapshotter. Add a time limit on BrokerMetadataSnapshotterTest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14370) Properly close ImageWriter objects
Colin McCabe created KAFKA-14370: Summary: Properly close ImageWriter objects Key: KAFKA-14370 URL: https://issues.apache.org/jira/browse/KAFKA-14370 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikael updated KAFKA-14362: --- Description: Trigger scenario: Two Kafka client application instances on separate EC2 instances with one consumer each, consuming from the same 8 partition topic using the same group ID. Duplicate consumption of a handful of messages sometimes happens right after one of the application instances has been restarted. Additional information: Messages are produced to the topic by a Kafka streams topology deployed on four application instances. I have verified that each message is only produced once by enabling debug logging in the topology flow right before producing each message to the topic. Example logs below are from a test run when a batch of 11 messages were consumed at 10:28:26,771 on the restarted instance and 9 of them were consumed as part of a larger batch at 10:28:23,824 on the other instance. Application shutdown was initiated at 10:27:13,086 and completed at 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 10:28:37,491. Kafka consumer group logs after restart on the instance that was restarted: {code:java} 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:853] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Discovered group coordinator b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 2147483646 rack: null) 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:535] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] (Re-)joining group 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1000] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Request joining group due to: need to re-join with the given member-id 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:535] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] (Re-)joining group 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:595] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Successfully joined group with generation Generation{generationId=676, memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', protocol='cooperative-sticky'} 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:761] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Successfully synced group in generation Generation{generationId=676, memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', protocol='cooperative-sticky'} 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:395] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Updating assignment with Assigned partitions: [] Current owned partitions: [] Added partitions (assigned - owned): [] Revoked partitions (owned - assigned): [] 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:279] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Notifying assignor about the new Assignment(partitions=[]) 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:291] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Adding newly assigned partitions: 2022-11-07 10:28:25,315 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1000] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Request joining group due to: group is already rebalancing 2022-11-07 10:28:25,317 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:535] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] (Re-)joining group 2022-11-07 10:28:25,319 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:595] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Successfully joined group with generation Generation{generationId=677, memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', protocol
[GitHub] [kafka] kirktrue opened a new pull request, #12832: KAFKA-14365: Refactor Fetcher to allow different implementations
kirktrue opened a new pull request, #12832: URL: https://github.com/apache/kafka/pull/12832 Rename `Fetcher` to `DefaultFetcher` as part of process to allow multiple implementations of the `Fetcher` API. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14366) Kafka consumer rebalance issue, offsets points back to very old committed offset
[ https://issues.apache.org/jira/browse/KAFKA-14366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630581#comment-17630581 ] Philip Nee commented on KAFKA-14366: Hey [~Chetu] - thanks for reporting this, could you kindly provide the steps to reproduce this issue? > Kafka consumer rebalance issue, offsets points back to very old committed > offset > > > Key: KAFKA-14366 > URL: https://issues.apache.org/jira/browse/KAFKA-14366 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager >Affects Versions: 2.8.1 > Environment: Production >Reporter: Chetan >Priority: Major > Attachments: rebalance issue.docx > > > Hi All, > We are facing an issue while the client consumer restart (again not all > restarts are ending up with this issue) and during the re-balancing scenario, > sometimes one of the partition offsets goes back a long way from the > committed offset. > Scenario : > Assume we have 4 instances of consumer and restarts of consumer one after the > other. > # At the time of starting restarts assume the offset on partition 10 of a > topic being consumed is pointing to 5. (last offset of the topic and 0 > lag) > # When restarts start (rebalancing) suddenly the offsets start pointing to > 2. > # While all the restarts are going on the consumer who is attached starts > reading from 2 and goes on. > # Once all rebalance is completed, and all messages from 2 to 5 > offset has been read (where it had stopped initially) > We end up having around 30K duplicates. > (The numbers here are just an example, in production, we are facing huge > duplicates and every two rebalance during restarts of consumer out of 10 > restart exercise activity ends up in such duplicates and not all partitions > and only one or two partitions behave this way and randomly) > This seems to be a bug. I am attaching all screenshots for reference as well. > Can someone kindly help out here? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14358) Users should not be able to create a regular topic name __cluster_metadata
[ https://issues.apache.org/jira/browse/KAFKA-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio reassigned KAFKA-14358: -- Assignee: José Armando García Sancio > Users should not be able to create a regular topic name __cluster_metadata > -- > > Key: KAFKA-14358 > URL: https://issues.apache.org/jira/browse/KAFKA-14358 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > The following test passes and it should not: > {code:java} > $ git diff > diff --git > a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > index 57834234cc..14b1435d00 100644 > --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > @@ -102,6 +102,12 @@ class CreateTopicsRequestTest extends > AbstractCreateTopicsRequestTest { > validateTopicExists("partial-none") > } > > + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > + @ValueSource(strings = Array("zk", "kraft")) > + def testClusterMetadataTopicFails(quorum: String): Unit = { > + createTopic("__cluster_metadata", 1, 1) > + } > + > @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > @ValueSource(strings = Array("zk")) > def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {{code} > Result of this test: > {code:java} > $ ./gradlew core:test --tests > CreateTopicsRequestTest.testClusterMetadataTopicFails > > Configure project : > Starting build with version 3.4.0-SNAPSHOT (commit id bc780c7c) using Gradle > 7.5.1, Java 1.8 and Scala 2.13.8 > Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > > Task :core:test > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[1] > PASSED > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[2] > PASSED > BUILD SUCCESSFUL in 44s > 44 actionable tasks: 3 executed, 41 up-to-date > {code} > I think that this test should fail in both KRaft and ZK. We want this to fail > in ZK so that it can be migrated to KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14369) Docs - KRAFT controller authentication example
Domenic Bove created KAFKA-14369: Summary: Docs - KRAFT controller authentication example Key: KAFKA-14369 URL: https://issues.apache.org/jira/browse/KAFKA-14369 Project: Kafka Issue Type: Bug Components: docs Affects Versions: 3.3.1 Reporter: Domenic Bove The [Kafka Listener docs |https://kafka.apache.org/documentation/#listener_configuration]mention how to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller listener, but it is not a working example and I found that I was missing this property: `sasl.mechanism.controller.protocol` when attempting to do SASL_PLAINTEXT on the controller listener. I see that property here: [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol] But nowhere else. I wonder if a complete working example would be better. Here are my working configs for sasl plain on the controller {code:java} process.roles=controller listeners=CONTROLLER://:9093 node.id=1 controller.quorum.voters=1@localhost:9093 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; listener.name.controller.sasl.enabled.mechanisms=PLAIN listener.name.controller.sasl.mechanism=PLAIN sasl.enabled.mechanisms=PLAIN sasl.mechanism.controller.protocol=PLAIN{code} Or maybe just a callout of that property in the existing docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14369) Docs - KRAFT controller authentication example
[ https://issues.apache.org/jira/browse/KAFKA-14369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Domenic Bove updated KAFKA-14369: - Description: The [Kafka Listener docs |https://kafka.apache.org/documentation/#listener_configuration]mention how to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller listener, but it is not a working example and I found that I was missing this property: {code:java} sasl.mechanism.controller.protocol {code} when attempting to do SASL_PLAINTEXT on the controller listener. I see that property here: [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol] But nowhere else. I wonder if a complete working example would be better. Here are my working configs for sasl plain on the controller {code:java} process.roles=controller listeners=CONTROLLER://:9093 node.id=1 controller.quorum.voters=1@localhost:9093 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; listener.name.controller.sasl.enabled.mechanisms=PLAIN listener.name.controller.sasl.mechanism=PLAIN sasl.enabled.mechanisms=PLAIN sasl.mechanism.controller.protocol=PLAIN{code} Or maybe just a callout of that property in the existing docs was: The [Kafka Listener docs |https://kafka.apache.org/documentation/#listener_configuration]mention how to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller listener, but it is not a working example and I found that I was missing this property: `sasl.mechanism.controller.protocol` when attempting to do SASL_PLAINTEXT on the controller listener. I see that property here: [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol] But nowhere else. I wonder if a complete working example would be better. Here are my working configs for sasl plain on the controller {code:java} process.roles=controller listeners=CONTROLLER://:9093 node.id=1 controller.quorum.voters=1@localhost:9093 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; listener.name.controller.sasl.enabled.mechanisms=PLAIN listener.name.controller.sasl.mechanism=PLAIN sasl.enabled.mechanisms=PLAIN sasl.mechanism.controller.protocol=PLAIN{code} Or maybe just a callout of that property in the existing docs > Docs - KRAFT controller authentication example > -- > > Key: KAFKA-14369 > URL: https://issues.apache.org/jira/browse/KAFKA-14369 > Project: Kafka > Issue Type: Bug > Components: docs >Affects Versions: 3.3.1 >Reporter: Domenic Bove >Priority: Minor > > The [Kafka Listener docs > |https://kafka.apache.org/documentation/#listener_configuration]mention how > to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller > listener, but it is not a working example and I found that I was missing this > property: > {code:java} > sasl.mechanism.controller.protocol {code} > when attempting to do SASL_PLAINTEXT on the controller listener. I see that > property here: > [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol] > But nowhere else. > I wonder if a complete working example would be better. Here are my working > configs for sasl plain on the controller > {code:java} > process.roles=controller > listeners=CONTROLLER://:9093 > node.id=1 > controller.quorum.voters=1@localhost:9093 > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT > listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule > required username="admin" password="admin-secret" user_admin="admin-secret" > user_alice="alice-secret"; > listener.name.controller.sasl.enabled.mechanisms=PLAIN > listener.name.controller.sasl.mechanism=PLAIN > sasl.enabled.mechanisms=PLAIN > sasl.mechanism.controller.protocol=PLAIN{code} > Or maybe just a callout of that property in the existing docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8059) Flaky Test DynamicConnectionQuotaTest #testDynamicConnectionQuota
[ https://issues.apache.org/jira/browse/KAFKA-8059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-8059: -- Assignee: Greg Harris > Flaky Test DynamicConnectionQuotaTest #testDynamicConnectionQuota > - > > Key: KAFKA-8059 > URL: https://issues.apache.org/jira/browse/KAFKA-8059 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Greg Harris >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/46/tests] > {quote}org.scalatest.junit.JUnitTestFailedError: Expected exception > java.io.IOException to be thrown, but no exception was thrown > at > org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100) > at > org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71) > at org.scalatest.Assertions$class.intercept(Assertions.scala:822) > at org.scalatest.junit.JUnitSuite.intercept(JUnitSuite.scala:71) > at > kafka.network.DynamicConnectionQuotaTest.testDynamicConnectionQuota(DynamicConnectionQuotaTest.scala:82){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
[ https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-12511: --- Assignee: Greg Harris > Flaky test > DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() > -- > > Key: KAFKA-12511 > URL: https://issues.apache.org/jira/browse/KAFKA-12511 > Project: Kafka > Issue Type: Bug >Reporter: Deng Ziming >Assignee: Greg Harris >Priority: Minor > > First time: > Listener PLAINTEXT connection rate 14.419389476913636 must be below > 14.399 ==> expected: but was: > Second time: > Listener EXTERNAL connection rate 10.998243336133811 must be below > 10.799 ==> expected: but was: > details: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
[ https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-8115: -- Assignee: Greg Harris > Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated > --- > > Key: KAFKA-8115 > URL: https://issues.apache.org/jira/browse/KAFKA-8115 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Greg Harris >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/] > {quote}org.junit.runners.model.TestTimedOutException: test timed out after > 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native > Method) at > java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) > at > java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123) > at > java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454) > at > java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) > at > app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157) > at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) > at > app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285) > at > app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596) > at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote} > STDOUT > {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with > agents: node02 and coordinator: node01 > (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 > 09:23:41,595] INFO Logging initialized @13340ms to > org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) > [2019-03-15 09:23:41,752] INFO Starting REST server > (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] > INFO Registered resource > org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb > (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] > INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: > c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS > (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO > DefaultSessionIdManager workerName=node0 > (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No > SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) > [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms > (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started > o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} > (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 > 09:23:44,473] INFO Started > ServerConnector@79a93bf1\{HTTP/1.1,[http/1.1]}{0.0.0.0:33477} > (org.eclipse.jetty.server.AbstractConnector:292) [2019-03-15 09:23:44,474] > INFO Started @16219ms (org.eclipse.jetty.server.Server:407) [2019-03-15 > 09:23:44,475] INFO REST server listening at [http://127.0.1.1:33477/] > (org.apache.kafka.trogdor.rest.JsonRestServer:123) [2019-03-15 09:23:44,484] > INFO Starting REST server (org.apache.kafka.trogdor.rest.JsonRestServer:89) > [2019-03-15 09:23:44,485] INFO Registered resource > org.apache.kafka.trogdor
[jira] [Commented] (KAFKA-14357) Make it possible to batch describe requests in the Kafka Admin API
[ https://issues.apache.org/jira/browse/KAFKA-14357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630534#comment-17630534 ] Mickael Maison commented on KAFKA-14357: I took a look at these APIs to see what can be done. * Scram Credentials It already possible to retrieve credential details for multiple users in a single call: {code:java} DescribeUserScramCredentialsResult credentials = admin.describeUserScramCredentials(Arrays.asList("user1", "user2")); {code} * Quotas DescribeClientQuotasRequest/Response and Admin.describeClientQuotas() accepts multiple resources but the server side currently enforce that requests only contain one type of entities (user, clientId or IP). I think we should be able to change that logic and allow retrieving quotas for multiple similar entities. If we keep the current API, this would look like: {code:java} Collection components = Arrays.asList( ClientQuotaFilterComponent.ofEntity(USER, "user1"), ClientQuotaFilterComponent.ofEntity(USER, "user2") ); DescribeClientQuotasResult quotas = admin.describeClientQuotas(ClientQuotaFilter.contains(components)); {code} * ACLs This is a bit more tricky because DescribeAclsRequest/Response only accept a single resource. So we will need to update the protocol and Admin API. From my quick test, I think we should be able to have the following API: {code:java} Collection filters = Arrays.asList( new AclBindingFilter( ResourcePatternFilter.ANY, new AccessControlEntryFilter("User:user1", null, AclOperation.ANY, AclPermissionType.ANY)), new AclBindingFilter( ResourcePatternFilter.ANY, new AccessControlEntryFilter("User:user2", null, AclOperation.ANY, AclPermissionType.ANY)) ); Collection aclBindings = admin.describeAcls(filters).values().get(); {code} I would be nice to have AclBindingFilter behave like ClientQuotaFilter and contain multiple "components" but it looks like it will be very hard to achieve while keeping compatibility. Also the describeAcls() API expects a principal name so it requires the principal type prefix, "User:" in this case. > Make it possible to batch describe requests in the Kafka Admin API > -- > > Key: KAFKA-14357 > URL: https://issues.apache.org/jira/browse/KAFKA-14357 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Priority: Major > > The Admin API has several methods to describe different objects such as ACLs, > Quotas or SCRAM-SHA users. But these API seem to be usable only in one for > the two modes: > * Query or one users ACLs / Quotas / SCRAM-SHA credentials > * Query all existing ACLs / Quotas / SCRAM-SHA credentials > But there seems to be no way how to batch the describe requests for multiple > users. E.g. {_}describe ACLs of users Joe, John and Mike{_}. It would be nice > to have such option as it might make it easier for applications using the > Admin API to make less different API calls. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector
C0urante commented on PR #12366: URL: https://github.com/apache/kafka/pull/12366#issuecomment-1307551086 This was proposed in the original PR for MM2, but decided against: https://github.com/apache/kafka/pull/6295#discussion_r290372816 I was on the fence about adding something now that we're changing the logic. At `TRACE` level it's probably worth the effort; will add a log message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12808: KAFKA-14299: Avoid allocation & synchronization overhead in StreamThread loop
cadonna merged PR #12808: URL: https://github.com/apache/kafka/pull/12808 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12808: KAFKA-14299: Avoid allocation & synchronization overhead in StreamThread loop
cadonna commented on PR #12808: URL: https://github.com/apache/kafka/pull/12808#issuecomment-1307526346 Build failures are not related: ``` Build / JDK 11 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12829: MINOR: Avoid highestSupportedVersion outside tests
cmccabe merged PR #12829: URL: https://github.com/apache/kafka/pull/12829 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
dajac commented on code in PR #12827: URL: https://github.com/apache/kafka/pull/12827#discussion_r1016821182 ## build.gradle: ## @@ -1222,6 +1225,42 @@ project(':metadata') { } } +project(':group-coordinator') { + archivesBaseName = "kafka-group-coordinator" + + dependencies { +implementation project(':server-common') +implementation project(':clients') +implementation libs.slf4jApi + +testImplementation project(':clients') Review Comment: You're right. Gradle does not complain without them. I removed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
ijuma commented on code in PR #12827: URL: https://github.com/apache/kafka/pull/12827#discussion_r1016813429 ## build.gradle: ## @@ -1222,6 +1225,42 @@ project(':metadata') { } } +project(':group-coordinator') { + archivesBaseName = "kafka-group-coordinator" + + dependencies { +implementation project(':server-common') +implementation project(':clients') +implementation libs.slf4jApi + +testImplementation project(':clients') Review Comment: Isn't this redundant given that we already have an implementation dependency on clients? Or does gradle complain if you remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14358) Users should not be able to create a regular topic name __cluster_metadata
[ https://issues.apache.org/jira/browse/KAFKA-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucia Cerchie reassigned KAFKA-14358: - Assignee: Lucia Cerchie (was: José Armando García Sancio) > Users should not be able to create a regular topic name __cluster_metadata > -- > > Key: KAFKA-14358 > URL: https://issues.apache.org/jira/browse/KAFKA-14358 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: José Armando García Sancio >Assignee: Lucia Cerchie >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > The following test passes and it should not: > {code:java} > $ git diff > diff --git > a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > index 57834234cc..14b1435d00 100644 > --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > @@ -102,6 +102,12 @@ class CreateTopicsRequestTest extends > AbstractCreateTopicsRequestTest { > validateTopicExists("partial-none") > } > > + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > + @ValueSource(strings = Array("zk", "kraft")) > + def testClusterMetadataTopicFails(quorum: String): Unit = { > + createTopic("__cluster_metadata", 1, 1) > + } > + > @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > @ValueSource(strings = Array("zk")) > def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {{code} > Result of this test: > {code:java} > $ ./gradlew core:test --tests > CreateTopicsRequestTest.testClusterMetadataTopicFails > > Configure project : > Starting build with version 3.4.0-SNAPSHOT (commit id bc780c7c) using Gradle > 7.5.1, Java 1.8 and Scala 2.13.8 > Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > > Task :core:test > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[1] > PASSED > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[2] > PASSED > BUILD SUCCESSFUL in 44s > 44 actionable tasks: 3 executed, 41 up-to-date > {code} > I think that this test should fail in both KRaft and ZK. We want this to fail > in ZK so that it can be migrated to KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14128) Kafka Streams terminates on topic check
[ https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucia Cerchie reassigned KAFKA-14128: - Assignee: Lucia Cerchie > Kafka Streams terminates on topic check > --- > > Key: KAFKA-14128 > URL: https://issues.apache.org/jira/browse/KAFKA-14128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 > Environment: Any >Reporter: Patrik Kleindl >Assignee: Lucia Cerchie >Priority: Major > > Our streams application shut down unexpectedly after some network issues that > should have been easily recoverable. > Logs: > > {code:java} > 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] > org.apache.kafka.clients.NetworkClient : [AdminClient > clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting > from node 3 due to request timeout. > 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] > org.apache.kafka.clients.NetworkClient : [AdminClient > clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled > in-flight METADATA request with correlation id 985 due to node 3 being > disconnected (elapsed time since creation: 60023ms, elapsed time since send: > 60023ms, request timeout: 3ms) > 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] > o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected > error during topic description for > L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog. > Error message was: org.apache.kafka.common.errors.TimeoutException: > Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, > nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s) > 2022-07-29 13:39:37.869 INFO 25843 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > I think the relevant code is in > [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550] > {code:java} > topicFuture.getValue().get();{code} > without a timeout value cannot throw a TimeoutException, so the > TimeoutException of the AdminClient will be an ExecutionException and hit the > last else branch where the StreamsException is thrown. > Possible fix: > Use the KafkaFuture method with timeout: > {code:java} > public abstract T get(long timeout, TimeUnit unit) throws > InterruptedException, ExecutionException, > TimeoutException;{code} > instead of > {code:java} > public abstract T get() throws InterruptedException, ExecutionException;{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14358) Users should not be able to create a regular topic name __cluster_metadata
[ https://issues.apache.org/jira/browse/KAFKA-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucia Cerchie reassigned KAFKA-14358: - Assignee: (was: Lucia Cerchie) > Users should not be able to create a regular topic name __cluster_metadata > -- > > Key: KAFKA-14358 > URL: https://issues.apache.org/jira/browse/KAFKA-14358 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: José Armando García Sancio >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > The following test passes and it should not: > {code:java} > $ git diff > diff --git > a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > index 57834234cc..14b1435d00 100644 > --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala > @@ -102,6 +102,12 @@ class CreateTopicsRequestTest extends > AbstractCreateTopicsRequestTest { > validateTopicExists("partial-none") > } > > + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > + @ValueSource(strings = Array("zk", "kraft")) > + def testClusterMetadataTopicFails(quorum: String): Unit = { > + createTopic("__cluster_metadata", 1, 1) > + } > + > @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) > @ValueSource(strings = Array("zk")) > def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {{code} > Result of this test: > {code:java} > $ ./gradlew core:test --tests > CreateTopicsRequestTest.testClusterMetadataTopicFails > > Configure project : > Starting build with version 3.4.0-SNAPSHOT (commit id bc780c7c) using Gradle > 7.5.1, Java 1.8 and Scala 2.13.8 > Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > > Task :core:test > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[1] > PASSED > Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest > > testClusterMetadataTopicFails(String) > > kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[2] > PASSED > BUILD SUCCESSFUL in 44s > 44 actionable tasks: 3 executed, 41 up-to-date > {code} > I think that this test should fail in both KRaft and ZK. We want this to fail > in ZK so that it can be migrated to KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
dajac commented on code in PR #12827: URL: https://github.com/apache/kafka/pull/12827#discussion_r1016794520 ## build.gradle: ## @@ -1222,6 +1225,41 @@ project(':metadata') { } } +project(':group-coordinator') { + archivesBaseName = "kafka-group-coordinator" + + dependencies { +implementation project(':server-common') +implementation project(':clients') +compileOnly libs.log4j +testImplementation libs.junitJupiter +testImplementation libs.jqwik +testImplementation libs.hamcrest +testImplementation libs.mockitoCore +testImplementation libs.mockitoInline Review Comment: I removed all the unnecessary ones and I kept the minimal that I need for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
dajac commented on code in PR #12827: URL: https://github.com/apache/kafka/pull/12827#discussion_r1016794025 ## build.gradle: ## @@ -1222,6 +1225,41 @@ project(':metadata') { } } +project(':group-coordinator') { + archivesBaseName = "kafka-group-coordinator" + + dependencies { +implementation project(':server-common') +implementation project(':clients') +compileOnly libs.log4j Review Comment: Likely not. I fixed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Stephan14 commented on pull request #12308: KAFKA-14009: update rebalance timeout in memory when consumers use st…
Stephan14 commented on PR #12308: URL: https://github.com/apache/kafka/pull/12308#issuecomment-1307399716 @dajac I have fix it and can you review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Stephan14 commented on a diff in pull request #12308: KAFKA-14009: update rebalance timeout in memory when consumers use st…
Stephan14 commented on code in PR #12308: URL: https://github.com/apache/kafka/pull/12308#discussion_r1016779023 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ## @@ -1036,6 +1036,52 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsButCannotPersistChange(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) + checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + group.allMemberMetadata.foreach { member => + assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout) + assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout) + } + } + + + @Test + def staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsAndPersistChange(): Unit = { +val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) +val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) +checkJoinGroupResult(followerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) +val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) +checkJoinGroupResult(leaderJoinGroupResult, + Errors.NONE, + rebalanceResult.generation, + Set[String](leaderInstanceId, followerInstanceId), + Stable, + Some(protocolType), + leaderJoinGroupResult.leaderId, + leaderJoinGroupResult.memberId, + true) +assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) +val group = groupCoordinator.groupManager.getGroup(groupId).get +group.allMemberMetadata.foreach{ member => Review Comment: fixed ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ## @@ -1036,6 +1036,52 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsButCannotPersistChange(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) + checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + group.allMemberMetadata.foreach { member => + assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout) + assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout) + } + } + + + @Test + def staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsAndPersistChange(): Unit = { +val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) +val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) +checkJoinGroupResult(followerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) +val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) +checkJoinGroupResult(leaderJoinGroupResult, + Errors.NONE, + rebalanceResult.generation, + Set[String](leaderInstanceId, followerInstanceId), Review Comment: fixed -- This is an automated message from the Apache Git Service.
[GitHub] [kafka] C0urante commented on pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante commented on PR #12544: URL: https://github.com/apache/kafka/pull/12544#issuecomment-1307392191 Thanks Mickael! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante merged PR #12544: URL: https://github.com/apache/kafka/pull/12544 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
ijuma commented on code in PR #12827: URL: https://github.com/apache/kafka/pull/12827#discussion_r1016751779 ## build.gradle: ## @@ -1222,6 +1225,41 @@ project(':metadata') { } } +project(':group-coordinator') { + archivesBaseName = "kafka-group-coordinator" + + dependencies { +implementation project(':server-common') +implementation project(':clients') +compileOnly libs.log4j +testImplementation libs.junitJupiter +testImplementation libs.jqwik +testImplementation libs.hamcrest +testImplementation libs.mockitoCore +testImplementation libs.mockitoInline Review Comment: Do we need all these test dependencies? I'd consider removing `mockitoInline` and `hamcrest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12827: KAFKA-14363; Add new `group-coordinator` module (KIP-848)
ijuma commented on code in PR #12827: URL: https://github.com/apache/kafka/pull/12827#discussion_r1016750711 ## build.gradle: ## @@ -1222,6 +1225,41 @@ project(':metadata') { } } +project(':group-coordinator') { + archivesBaseName = "kafka-group-coordinator" + + dependencies { +implementation project(':server-common') +implementation project(':clients') +compileOnly libs.log4j Review Comment: Do we need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12795: KAFKA-14299: Initialize tasks in state updater
lucasbru commented on code in PR #12795: URL: https://github.com/apache/kafka/pull/12795#discussion_r1016657641 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1908,6 +1888,7 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { expectLastCall(); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall().andThrow(new RuntimeException("KABOOM!")); +expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andReturn(mkSet()); Review Comment: Sorry, that is a leftover -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12795: KAFKA-14299: Initialize tasks in state updater
lucasbru commented on code in PR #12795: URL: https://github.com/apache/kafka/pull/12795#discussion_r1016657214 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -277,6 +294,9 @@ private void addTask(final Task task) { } } } +if (task.state() == Task.State.CREATED) { +tasksToInitialize.offer(task); +} Review Comment: Well, only that the order here doesn't matter because I use the queue for the reason I mention above. In principle, I can restore the original order (to be safe), but just moving all the logic in this method to initializeTasksIfNeeded (except for the addition to the queue of course). wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12795: KAFKA-14299: Initialize tasks in state updater
lucasbru commented on code in PR #12795: URL: https://github.com/apache/kafka/pull/12795#discussion_r1016644371 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -115,6 +117,7 @@ public void run() { private void runOnce() throws InterruptedException { performActionsOnTasks(); +initializeTasksIfNeeded(); Review Comment: I wanted to avoid keeping the `tasksAndActionsLock` for too long. Based on what you said initialization can take some time, so there is some potential for blocking the main thread here. This is more or less just defensive programming and I don't have a very concrete situation where this becomes a problem. One can imagine that if you get two rebalances in a row, the second rebalance will be blocked in `handleAssignment` if the initialization from the first rebalance is still blocking `tasksAndActionsLock`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14368) Add an offset write REST API to Kafka Connect
Yash Mayya created KAFKA-14368: -- Summary: Add an offset write REST API to Kafka Connect Key: KAFKA-14368 URL: https://issues.apache.org/jira/browse/KAFKA-14368 Project: Kafka Issue Type: New Feature Reporter: Yash Mayya Assignee: Yash Mayya [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] for https://issues.apache.org/jira/browse/KAFKA-4107 proposes to add an offset reset API which will allow resetting the offsets for source and sink connectors so that they can consume from the beginning of the stream. However, an offset API to write arbitrary offsets would also be useful for certain connectors in order to go back in time but not to the beginning, or to skip some problematic record and move forward. Based on the discussion thread for KIP-875 [here|https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02], it was determined that this could be done through a follow-up KIP if/when KIP-875 is adopted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12795: KAFKA-14299: Initialize tasks in state updater
cadonna commented on code in PR #12795: URL: https://github.com/apache/kafka/pull/12795#discussion_r1016598938 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -115,6 +117,7 @@ public void run() { private void runOnce() throws InterruptedException { performActionsOnTasks(); +initializeTasksIfNeeded(); Review Comment: Why not initialize the task during `addTask()`? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1908,6 +1888,7 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { expectLastCall(); activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); expectLastCall().andThrow(new RuntimeException("KABOOM!")); +expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andReturn(mkSet()); Review Comment: How is this related to the change? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -277,6 +294,9 @@ private void addTask(final Task task) { } } } +if (task.state() == Task.State.CREATED) { +tasksToInitialize.offer(task); +} Review Comment: I would have expected that you initialize the task before any other action, i.e. as first step in this method. Before this PR, we also initialized the task before we passed it to the state updater. Are there any reasons to do it this way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
OmniaGM commented on code in PR #12577: URL: https://github.com/apache/kafka/pull/12577#discussion_r1015481020 ## connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java: ## @@ -69,6 +74,17 @@ public ReplicationPolicy replicationPolicy() { return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class); } +@SuppressWarnings("unchecked") +ForwardingAdmin getForwardingAdmin(Map config) { +try { +return Utils.newParameterizedInstance( Review Comment: Yes, however, based on my understanding `Configurable` class, 1. `Configurable` usually has no logic in the constructor and doesn't have any parameters passed to the constructor either. Instead, configurations are passed to `Configurable.configure(Map props)` which `getConfiguredInstance` calls after initialize an instance of `Configurable`. 2. Subclasses of `Configurable` aren't forced to implement `configure` and also aren't forced to call `super.configure` as part of the implementation of `configure`. I can change it to `Configurable` if you prefer this, but my point is that `Configurable` doesn't force the subclasses to inheart the initialization or configure logic as they are part of `configure` and not the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector
OmniaGM commented on PR #12366: URL: https://github.com/apache/kafka/pull/12366#issuecomment-1307124622 > Thanks @OmniaGM, good idea. I've updated the README and added an integration test that verifies that MM2 can still run with exactly-once support enabled. > > I should note that the `testReplication` test case is currently failing locally with timeout issues, but succeeds when I bump the timeout. Going to see how the Jenkins build goes; we may choose to increase the timeout in these tests if they also fail during CI. I had similar issue with the timeouts > It turns out that the `testReplication` flakiness persisted in Jenkins, and was not solved by increasing timeouts. > > Instead, the root of the problem was a change in the Connect framework's behavior when exactly-once support is enabled. Without exactly-once support, `SourceTask::commitRecord` is invoked [as soon as the record is ack'd](https://github.com/apache/kafka/blob/c1bb307a361b7b6e17261cc84ea2b108bacca84d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L148) by the Kafka cluster, which usually causes those calls to be spread out over time. With exactly-once support, `SourceTask::commitRecord` is invoked [for every record in a transaction](https://github.com/apache/kafka/blob/c1bb307a361b7b6e17261cc84ea2b108bacca84d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L312) once that transaction is committed, which causes a rapid series of calls to take place one after the other. > > MirrorMaker 2 triggers a (potential) offset sync after every call to `commitRecord`, but it has [logic to prevent too many outstanding offset syncs from accruing](https://github.com/apache/kafka/blob/c1bb307a361b7b6e17261cc84ea2b108bacca84d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L208-L211). The exact limit on the number of outstanding offset requests [is ten](https://github.com/apache/kafka/blob/c1bb307a361b7b6e17261cc84ea2b108bacca84d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L53), which is less than the total number of topic partitions being replicated during the integration test. As a result, the test became flaky, since sometimes MM2 would drop an offset sync for partition 0 of the `test-topic-1` and then fail when [checking for offset syncs for that topic partition](https://github.com/apache/kafka/blob/c1bb307a361b7b6e17261cc84ea2b108bacca84d/connect/mirror/src/test/java/org/apache/kafka/co nnect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java#L319-L320). > > Since the behavior change in the Connect framework may have an impact on MirrorMaker 2 outside of testing environments, I've tweaked the offset sync limit to apply on a per-topic-partition basis. This way, if a flurry of calls to `commitRecord` takes place when a transaction is committed, every topic partition should still get a chance for an offset sync, but there is still an upper bound on the number of outstanding offset syncs (although that bound is now proportional to the number of topic partitions being replicated, instead of a constant). Good finding. Should we add logs in `sendOffsetSync` to make it easier to find out if MirrorMaker is hitting the limit of `MAX_OUTSTANDING_OFFSET_SYNCS` in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #12752: KAFKA-14303 Fix batch.size=0 in BuiltInPartitioner
soarez commented on PR #12752: URL: https://github.com/apache/kafka/pull/12752#issuecomment-1307037576 Could you have another look @ijuma? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14294: -- Assignee: (was: Lucas Brutschy) > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.1 >Reporter: Vicky Papavasileiou >Priority: Major > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14294: -- Assignee: Lucas Brutschy > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.1 >Reporter: Vicky Papavasileiou >Assignee: Lucas Brutschy >Priority: Major > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630331#comment-17630331 ] Bruno Cadonna commented on KAFKA-13891: --- [~jdrean] [~ableegoldman] [~dajac] [~showuon] Should we reopen this ticket then? > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Priority: Major > Fix For: 3.3.0, 3.2.4 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14367) Introduce `GroupCoordinator` interface
David Jacot created KAFKA-14367: --- Summary: Introduce `GroupCoordinator` interface Key: KAFKA-14367 URL: https://issues.apache.org/jira/browse/KAFKA-14367 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot The goal is to introduce a new GroupCoordinator interface and to convert the existing coordinator to use it. The new coordinator will use it later on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630328#comment-17630328 ] Bruno Cadonna commented on KAFKA-14294: --- Streams does not commit a transaction if no records have been consumed from the input topic. See https://github.com/apache/kafka/blob/cd4a1cb4101abcc7cdd1d2d0d73662114108f3e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L182 It does not matter whether {{commitNeeded}} is set to true or not. If a punctuator writes to a state store but not records are consumed from the input topic, a transaction is started but it is never committed. > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.1 >Reporter: Vicky Papavasileiou >Priority: Major > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1016427264 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -1100,6 +1102,67 @@ class ReplicaFetcherThreadTest { assertEquals(Collections.singletonList(tid1p0), fetchRequestBuilder2.removed()) } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testLocalFetchCompletionIfHighWatermarkUpdated(highWatermarkUpdated: Boolean): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) +val highWatermarkReceivedFromLeader = 100L + +val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend]) +when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint) + +var maybeNewHighWatermark: Option[Long] = None +if (highWatermarkUpdated) { + maybeNewHighWatermark = Some(highWatermarkReceivedFromLeader) +} Review Comment: small nit: In Scala, we tend to write this as follow: ``` val maybeNewHighWatermark = if (highWatermarkUpdated) { Some(highWatermarkReceivedFromLeader) } else { None } ``` It may also fit on one line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on code in PR #12783: URL: https://github.com/apache/kafka/pull/12783#discussion_r1016425287 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -135,6 +151,13 @@ class ReplicaFetcherThread(name: String, logAppendInfo } + private def completeDelayedFetchRequests(): Unit = { +if (partitionsWithNewHighWatermark.nonEmpty) { + replicaMgr.completeDelayedFetchRequests(partitionsWithNewHighWatermark.toSeq) Review Comment: I think that the failed test with Java 1.8 and Scala 2.12 is due to this line. In Scala 2.13, `toSeq` creates an immutable copy where it does not in 2.12. Hence in 2.12, the collection is cleared at the next line before the mock is verified. We should force the copy here (and add a comment). ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -3455,6 +3455,21 @@ class UnifiedLogTest { assertFalse(newDir.exists()) } + @Test + def testMaybeUpdateHighWatermarkAsFollower(): Unit = { +val logConfig = LogTestUtils.createLogConfig() +val log = createLog(logDir, logConfig) + +for (i <- 0 until 100) { + val records = TestUtils.singletonRecords(value = s"test$i".getBytes) + log.appendAsLeader(records, leaderEpoch = 0) +} + +assertEquals(Some(100L), log.maybeUpdateHighWatermark(100L)) Review Comment: The sequence is a bit weird. I would have added `99L` just before `100L`. Something like this: ``` assertEquals(Some(99L), log.maybeUpdateHighWatermark(99L)) assertEquals(None, log.maybeUpdateHighWatermark(99L)) assertEquals(Some(100L), log.maybeUpdateHighWatermark(100L)) assertEquals(None, log.maybeUpdateHighWatermark(100L)) // Bound by the log end offset. assertEquals(None, log.maybeUpdateHighWatermark(101L)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.
mimaison commented on PR #12577: URL: https://github.com/apache/kafka/pull/12577#issuecomment-1306884230 Thanks @OmniaGM for the updates. A few test failures seem related: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12577/16/testReport/org.apache.kafka.connect.mirror.integration/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org