Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: Addressed in [062e932](https://github.com/apache/kafka/pull/15136/commits/062e932f260ce9e1df9571b2fc982c63cbaf0f7c) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed
Ayoub Omari created KAFKA-16573: --- Summary: Streams does not specify where a Serde is needed Key: KAFKA-16573 URL: https://issues.apache.org/jira/browse/KAFKA-16573 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari Example topology: {code:java} builder .table("input", Consumed.`with`(Serdes.String(), Serdes.String())) .groupBy((key, value) => new KeyValue(value, key)) .count() .toStream() .to("output", Produced.`with`(Serdes.String(), Serdes.Long())) {code} At runtime, we get the following exception {code:java} Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} The error does not give information about the line or the processor causing the issue. Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology. Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1569073857 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -79,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig, private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient)) private val createTopicPolicy = - Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy])) +Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) Review Comment: This should be `CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG` ## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ## @@ -399,18 +400,18 @@ class LogConfigTest { } } - /* Verify that when the deprecated config LogMessageTimestampDifferenceMaxMsProp has non default value the new configs - * LogMessageTimestampBeforeMaxMsProp and LogMessageTimestampAfterMaxMsProp are not changed from the default we are using + /* Verify that when the deprecated config LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_PROP has non default value the new configs Review Comment: `LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_PROP` has been renamed to `LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG` Same below ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1152,12 +1151,12 @@ class KafkaConfigTest { defaults.setProperty(KafkaConfig.BrokerIdProp, "1") defaults.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122") defaults.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3") -defaults.setProperty(KafkaConfig.LogDirProp, "/tmp1,/tmp2") -defaults.setProperty(KafkaConfig.LogRollTimeHoursProp, "12") -defaults.setProperty(KafkaConfig.LogRollTimeJitterHoursProp, "11") -defaults.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "10") -//For LogFlushIntervalMsProp -defaults.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123") +defaults.setProperty(KafkaLogConfigs.LOG_DIR_CONFIG, "/tmp1,/tmp2") +defaults.setProperty(KafkaLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, "12") +defaults.setProperty(KafkaLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, "11") +defaults.setProperty(KafkaLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "10") +//For LOG_FLUSH_INTERVAL_MS_PROP Review Comment: This has been renamed to `LOG_FLUSH_INTERVAL_MS_CONFIG` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 opened a new pull request, #15745: URL: https://github.com/apache/kafka/pull/15745 We introduced `disksPerBroker` in `TestKitNodes` from https://issues.apache.org/jira/browse/KAFKA-16559. We can support to config it in `ClusterTest`, so it's more convenient for integration tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1569103206 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode Review Comment: I found that the integration test would fail occasionally because the config change would happen before an active controller was seen (so zkVersion was 0). Adding this exception allows the test to retry to alter config call. In production, this could only be hit if someone was actively calling alter configs as the cluster was being deployed and the call was handled in between the time that the first broker came up and when the controller became active. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569077848 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -777,6 +778,59 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); Review Comment: I think that we should explicitly remove the previous group before adding the new one because we update metrics when the previous group is removed. We could likely call `removeGroup` for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569055496 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + +/** + * Create a new consumer group according to the given classic group. + * + * @param snapshotRegistry The SnapshotRegistry. + * @param metrics The GroupCoordinatorMetricsShard. + * @param classicGroup The converted classic group. + * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @return The created ConsumerGruop. + */ +public static ConsumerGroup fromClassicGroup( +SnapshotRegistry snapshotRegistry, +GroupCoordinatorMetricsShard metrics, +ClassicGroup classicGroup, +TopicsImage topicsImage +) { +String groupId = classicGroup.groupId(); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); +consumerGroup.setGroupEpoch(classicGroup.generationId()); +consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId()); + +classicGroup.allMembers().forEach(classicGroupMember -> { +ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment( +ByteBuffer.wrap(classicGroupMember.assignment()) +); +Map> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + +ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())) +); + +// The target assignment and the assigned partitions of each member are set based on the last +// assignment of the classic group. All the members are put in the Stable state. If the classic +// group was in Preparing Rebalance or Completing Rebalance states, the classic members are +// asked to rejoin the group to re-trigger a rebalance or collect their assignments. +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId()) +.setMemberEpoch(classicGroup.generationId()) +.setState(MemberState.STABLE) +.setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(classicGroupMember.groupInstanceId().orElse(null)) +.setRackId(subscription.rackId().orElse(null)) +.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs()) +.setClientId(classicGroupMember.clientId()) +.setClientHost(classicGroupMember.clientHost()) +.setSubscribedTopicNames(subscription.topics()) +.setAssignedPartitions(partitions) + .setSupportedClassicProtocols(classicGroupMember.supportedProtocols()) +.build(); +consumerGroup.updateTargetAssignment(newMember.memberId(), new Assignment(partitions)); +consumerGroup.updateMember(newMember); +}); + +return consumerGroup; +} + +/** + * Populate the record list with the records needed to create the given consumer group. + * + * @param records The list to which the new records are added. + */ +public void createConsumerGroupRecords( +List records +) { +members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), consumerGroupMember)) +); + +records.add(RecordHelpers.newGroupEpochRecord(groupId(), groupEpoch())); + +members().forEach((consumerGroupMemberId, consumerGroupMember) -> +records.add(RecordHelpers.newTargetAssignmentRecord( +groupId(), +consumerGroupMemberId, +targetAssignment(consumerGroupMemberId).partitions() +)) +); + +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), groupEpoch())); + +members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), consumerGroupMember)) +); +} + +/** + * @return The map of topic id and partition set converted from the list of TopicPartition. + */ +private static Map> topicPartitionMapFromList( +List partitions, +TopicsImage topicsImage +) { +Map> topicPartitionMap = new HashMap<>(); +partitions.forEach(topicPartition -> { +TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); +if (topicImage != null) { +topicPartitionMap +
Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]
C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final Map connectorConfigPatch) throws Throwable { +FutureCallback> cb = new FutureCallback<>(); +herder.patchConnectorConfig(connector, connectorConfigPatch, cb); +Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", +"PATCH", headers, connectorConfigPatch, new TypeReference() { }, new CreatedConnectorInfoTranslator(), forward); +return Response.ok().entity(createdInfo.result()).build(); Review Comment: Just realizing now that we don't actually specify the status and body of the REST response in the KIP. I agree with what's here, especially since it matches the existing `PUT /connectors/{name}/config` endpoint, but it's worth specifying in the KIP for completeness. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } +@Test +public void testPatchConnectorConfigNotFound() { +ClusterConfigState clusterConfigState = new ClusterConfigState( +0, +null, +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptyMap(), +Collections.emptySet(), +Collections.emptySet()); +expectConfigRefreshAndSnapshot(clusterConfigState); + +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +FutureCallback> patchCallback = new FutureCallback<>(); +herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); +herder.tick(); +assertTrue(patchCallback.isDone()); +ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); +assertInstanceOf(NotFoundException.class, exception.getCause()); +} + +@Test +public void testPatchConnectorConfig() throws Exception { +when(member.memberId()).thenReturn("leader"); +expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + +Map originalConnConfig = new HashMap<>(CONN1_CONFIG); +originalConnConfig.put("foo1", "bar1"); +originalConnConfig.put("foo2", "bar2"); Review Comment: Nit: can we add one more key/value pair that should be unchanged after the patch is applied? This would catch bugs where the set of post-patch keys is derived from the patch instead of the combination of the patch and the prior configuration. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } +@PATCH +@Path("/{connector}/config") +public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, Review Comment: Nit: ```suggestion final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } +@Test +public void testPatchConnectorConfigNotFound() { +Map connConfigPatch = new HashMap<>(); +connConfigPatch.put("foo1", "baz1"); + +Callback> patchCallback = mock(Callback.class); +herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569051446 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); +metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +if (t == null) { +classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); +prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); Review Comment: I do agree with the `appendFuture` part of your explanation. However, I still believe that we should schedule the session timeouts and the start the rebalance before it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Refactor org.apache.kafka.server.config.Defaults to follow Java config pattern [kafka]
OmniaGM commented on PR #15260: URL: https://github.com/apache/kafka/pull/15260#issuecomment-2061522296 Am going to close this as we already started to move defaults out of Default and into the scoped config classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1569024432 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: I think we can create a file like `LogDirUtils.java` in the future, so we don't need to define same variables like `FutureDirPattern` and `FutureDirSuffix` in `LogSegment.java` and `LocalLog.scala`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]
OmniaGM closed pull request #15501: KAFKA-15853: Move KafkaConfig properties definition out of core URL: https://github.com/apache/kafka/pull/15501 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]
OmniaGM commented on PR #15501: URL: https://github.com/apache/kafka/pull/15501#issuecomment-2061520938 Am going to close this one as it drifted away from the approach we took to break the configs out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Refactor org.apache.kafka.server.config.Defaults to follow Java config pattern [kafka]
OmniaGM closed pull request #15260: KAFKA-15853: Refactor org.apache.kafka.server.config.Defaults to follow Java config pattern URL: https://github.com/apache/kafka/pull/15260 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
brandboat commented on code in PR #15719: URL: https://github.com/apache/kafka/pull/15719#discussion_r1569022628 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -413,7 +413,7 @@ class LogManagerTest { assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.") // this cleanup shouldn't find any expired segments but should delete some to reduce size -time.sleep(logManager.InitialTaskDelayMs) +time.sleep(logManager.initialTaskDelayMs) assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments") time.sleep(log.config.fileDeleteDelayMs + 1) Review Comment: Thanks for the explanation ! > or maybe we can directly add verification inside these tests? I decided to follow the comment as you mentioned earlier, and updated the initialTaskDelayMs to 10s in LogManagerTests. Please take another look :smiley: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
johnnychhsu commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1569005655 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerRegistration = getControllerRegistration match { + case Some(registration) => registration + case None => +// This case is mainly here to make tests less flaky. In practice, there will always be a /controller ZNode Review Comment: may I know why without this could make the test flaky? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]
OmniaGM commented on PR #15728: URL: https://github.com/apache/kafka/pull/15728#issuecomment-2061493764 Hi @aaron-ai thanks for the PR. Can you please have a look into the failing tests herehttps://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15728/1/#showFailuresLink I believe they are related for example [MirrorConnectorConfigTest]( https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15728/1/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorConfigTest/Build___JDK_21_and_Scala_2_13___testSourceConsumerConfigWithSourcePrefix__/) is failing with ``` org.opentest4j.AssertionFailedError: source.consumer. source consumer config not matching ==> expected: <{enable.auto.commit=false, max.poll.interval.ms=100, auto.offset.reset=latest, client.id=source1->target2|ConnectorName|test}> but was: <{enable.auto.commit=false, max.poll.interval.ms=100, allow.auto.create.topics=false, auto.offset.reset=latest, client.id=source1->target2|ConnectorName|test}> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp commented on code in PR #15722: URL: https://github.com/apache/kafka/pull/15722#discussion_r1569005969 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, String baseName, String action) { MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, -String.format("*Deprecated* The fraction of time the I/O thread spent %s", action), metricTags); +String.format("The fraction of time the I/O thread spent %s", action), metricTags); Review Comment: Please find it in PR description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp commented on code in PR #15722: URL: https://github.com/apache/kafka/pull/15722#discussion_r1569005969 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, String baseName, String action) { MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, -String.format("*Deprecated* The fraction of time the I/O thread spent %s", action), metricTags); +String.format("The fraction of time the I/O thread spent %s", action), metricTags); Review Comment: Hi, please find it in PR description ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, String baseName, String action) { MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, -String.format("*Deprecated* The fraction of time the I/O thread spent %s", action), metricTags); +String.format("The fraction of time the I/O thread spent %s", action), metricTags); Review Comment: Please find it in PR description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15309: Add custom error handler to Producer [kafka]
OmniaGM commented on PR #15731: URL: https://github.com/apache/kafka/pull/15731#issuecomment-2061484219 Hi @aliehsaeedii thanks for the PoC and the KIP. Can you please mark this PoC as draft PR as the KIP still under discussion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + +/** + * Create a new consumer group according to the given classic group. + * + * @param snapshotRegistry The SnapshotRegistry. + * @param metrics The GroupCoordinatorMetricsShard. + * @param classicGroup The converted classic group. + * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @return The created ConsumerGruop. + */ +public static ConsumerGroup fromClassicGroup( +SnapshotRegistry snapshotRegistry, +GroupCoordinatorMetricsShard metrics, +ClassicGroup classicGroup, +TopicsImage topicsImage +) { +String groupId = classicGroup.groupId(); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); +consumerGroup.setGroupEpoch(classicGroup.generationId()); +consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId()); + +classicGroup.allMembers().forEach(classicGroupMember -> { +ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment( +ByteBuffer.wrap(classicGroupMember.assignment()) +); +Map> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + +ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())) +); + +// The target assignment and the assigned partitions of each member are set based on the last +// assignment of the classic group. All the members are put in the Stable state. If the classic +// group was in Preparing Rebalance or Completing Rebalance states, the classic members are +// asked to rejoin the group to re-trigger a rebalance or collect their assignments. +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId()) +.setMemberEpoch(classicGroup.generationId()) +.setState(MemberState.STABLE) +.setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(classicGroupMember.groupInstanceId().orElse(null)) +.setRackId(subscription.rackId().orElse(null)) +.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs()) +.setClientId(classicGroupMember.clientId()) +.setClientHost(classicGroupMember.clientHost()) +.setSubscribedTopicNames(subscription.topics()) +.setAssignedPartitions(partitions) + .setSupportedClassicProtocols(classicGroupMember.supportedProtocols()) +.build(); +consumerGroup.updateTargetAssignment(newMember.memberId(), new Assignment(partitions)); +consumerGroup.updateMember(newMember); +}); + +return consumerGroup; +} + +/** + * Populate the record list with the records needed to create the given consumer group. + * + * @param records The list to which the new records are added. + */ +public void createConsumerGroupRecords( +List records +) { +members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), consumerGroupMember)) +); + +records.add(RecordHelpers.newGroupEpochRecord(groupId(), groupEpoch())); + +members().forEach((consumerGroupMemberId, consumerGroupMember) -> +records.add(RecordHelpers.newTargetAssignmentRecord( +groupId(), +consumerGroupMemberId, +targetAssignment(consumerGroupMemberId).partitions() +)) +); + +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), groupEpoch())); + +members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), consumerGroupMember)) +); +} + +/** + * @return The map of topic id and partition set converted from the list of TopicPartition. + */ +private static Map> topicPartitionMapFromList( +List partitions, +TopicsImage topicsImage +) { +Map> topicPartitionMap = new HashMap<>(); +partitions.forEach(topicPartition -> { +TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); +if (topicImage != null) { +topicPartitionMap +
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + +/** + * Create a new consumer group according to the given classic group. + * + * @param snapshotRegistry The SnapshotRegistry. + * @param metrics The GroupCoordinatorMetricsShard. + * @param classicGroup The converted classic group. + * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @return The created ConsumerGruop. + */ +public static ConsumerGroup fromClassicGroup( +SnapshotRegistry snapshotRegistry, +GroupCoordinatorMetricsShard metrics, +ClassicGroup classicGroup, +TopicsImage topicsImage +) { +String groupId = classicGroup.groupId(); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); +consumerGroup.setGroupEpoch(classicGroup.generationId()); +consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId()); + +classicGroup.allMembers().forEach(classicGroupMember -> { +ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment( +ByteBuffer.wrap(classicGroupMember.assignment()) +); +Map> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + +ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())) +); + +// The target assignment and the assigned partitions of each member are set based on the last +// assignment of the classic group. All the members are put in the Stable state. If the classic +// group was in Preparing Rebalance or Completing Rebalance states, the classic members are +// asked to rejoin the group to re-trigger a rebalance or collect their assignments. +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId()) +.setMemberEpoch(classicGroup.generationId()) +.setState(MemberState.STABLE) +.setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(classicGroupMember.groupInstanceId().orElse(null)) +.setRackId(subscription.rackId().orElse(null)) +.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs()) +.setClientId(classicGroupMember.clientId()) +.setClientHost(classicGroupMember.clientHost()) +.setSubscribedTopicNames(subscription.topics()) +.setAssignedPartitions(partitions) + .setSupportedClassicProtocols(classicGroupMember.supportedProtocols()) +.build(); +consumerGroup.updateTargetAssignment(newMember.memberId(), new Assignment(partitions)); +consumerGroup.updateMember(newMember); +}); + +return consumerGroup; +} + +/** + * Populate the record list with the records needed to create the given consumer group. + * + * @param records The list to which the new records are added. + */ +public void createConsumerGroupRecords( +List records +) { +members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), consumerGroupMember)) +); + +records.add(RecordHelpers.newGroupEpochRecord(groupId(), groupEpoch())); + +members().forEach((consumerGroupMemberId, consumerGroupMember) -> +records.add(RecordHelpers.newTargetAssignmentRecord( +groupId(), +consumerGroupMemberId, +targetAssignment(consumerGroupMemberId).partitions() +)) +); + +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), groupEpoch())); + +members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), consumerGroupMember)) +); +} + +/** + * @return The map of topic id and partition set converted from the list of TopicPartition. + */ +private static Map> topicPartitionMapFromList( +List partitions, +TopicsImage topicsImage +) { +Map> topicPartitionMap = new HashMap<>(); +partitions.forEach(topicPartition -> { +TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); +if (topicImage != null) { +topicPartitionMap +
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1568991555 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: I was expecting to see here the logic for wrapping the callback error into a `KafkaException`, but I see it is at a lower level in the `invokeRebalanceCallbacks`, which it's a bit more obfuscated I would say? Still I see how it's deeply tied to the `ConsumerRebalanceListenerCallbackCompletedEvent` so ok for me to leave as it is if we feel it's clear enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah opened a new pull request, #15744: URL: https://github.com/apache/kafka/pull/15744 This patch fixes two issues with IncrementalAlterConfigs and the ZK migration. First, it changes the handling of IncrementalAlterConfigs to check if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not directly modifying configs in ZK if there is a KRaft controller. This closes the race condition between KRaft taking over as the active controller and the ZK brokers learning about this. During the ZK migration, there is a time when the ZK brokers are running with migrations enabled, but KRaft has yet to take over as the controller. Prior to KRaft taking over as the controller, the ZK brokers in migration mode were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK controller. This works for some config types, but breaks when setting BROKER and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for IAC was to always forward if the forwarding manager was defined. Since ZK brokers in migration mode have forwarding enabled, the forwarding would happen, and the special logic for BROKER and BROKER_LOGGER would be missed, causing the request to fail. With this fix, the IAC handler will check if the controller is KRaft or ZK and only forward for KRaft. As part of KIP-500, we moved most (but not all) ZK mutations to the ZK controller. One of the things we did not move fully to the controller was entity configs. This is because there was some special logic that needed to run on the broker for certain config updates. If a broker-specific config was set, AdminClient would route the request to the proper broker. In KRaft, we have a different mechanism for handling broker-specific config updates. Leaving this ZK update on the broker side would be okay if we were guarding writes on the controller epoch, but it turns out KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" updates to ZK. This means a ZK broker could update the contents of ZK _after_ the metadata had been migrated to KRaft. No good! To fix this, this patch adds a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but also adds logic to fail the update if the controller is a KRaft controller. The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a new exception that can be thrown while updating configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1568972567 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1333,32 +1342,41 @@ public void testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() { Optional empty = Optional.empty(); Optional error = Optional.of(new RuntimeException("Intentional error")); +Optional kafkaException = Optional.of(new KafkaException("Intentional error")); +Optional wrappedException = Optional.of(new KafkaException("User rebalance callback throws an error", error.get())); return Stream.of( // Tests if we don't have an event, the listener doesn't get called. -Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0), +Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty), // Tests if we get an event for a revocation, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty), // Tests if we get an event for an assignment, that we invoke our listener. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0), +Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0, wrappedException), // Tests that we invoke our listener even if it encounters an exception. -Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1), +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException), + +// Tests that we invoke our listener even if it encounters an exception. Special case to test that a kafka exception is not wrapped. +Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException), Review Comment: Nice addition, indeed part of what the legacy logic does for not wrapping a KafkaException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1568968759 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); + +@Test +public void testSetRecordWithDifferentTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +} + +@Test +public void testMaybeFlushWithTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(partitionMetadataFile::maybeFlush); + +assertDoesNotThrow(() -> { +List lines = Files.readAllLines(file.toPath()); +assertEquals(2, lines.size()); +assertEquals("version: 0", lines.get(0)); +assertEquals("topic_id: " + topicId, lines.get(1)); +}); +} + +@Test +public void testMaybeFlushWithNoTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +assertDoesNotThrow(partitionMetadataFile::maybeFlush); +assertEquals(0, file.length()); +} + +@Test +public void testRead() { +File file = PartitionMetadataFile.newFile(dir); +LogDirFailureChannel channel = Mockito.mock(LogDirFailureChannel.class); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, channel); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: I see, thank you very much for the detailed explanation! I've revised it as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); +metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +if (t == null) { +classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); +prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); Review Comment: If creating all the state immediately, we'll get error when replaying the old ConsumerGroup tombstone because `group.get(groupId)` has become a ClassicGroup. We can't rely on replaying the records to update the states either, because we need the new classicGroup reference to trigger the rebalance. so the only way is not to replay the records by setting the appendFuture. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1568963034 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); +metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +if (t == null) { +classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); +prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); Review Comment: Yes we should revert `onClassicGroupStateTransition` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; +try { +classicGroup = consumerGroup.toClassicGroup( +leavingMemberId, +logContext, +time, +consumerGroupSessionTimeoutMs, +metadataImage, +records +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + +groups.put(consumerGroup.groupId(), classicGroup); +metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +if (t == null) { +classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); +prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); Review Comment: If creating all the state immediately, we'll get error when replaying the old ConsumerGroup tombstone because `group.get(groupId)` has become a ClassicGroup. We can't rely on replaying the records to update the states either, because we need the new classicGroup reference to trigger the rebalance. so the only way is not to replay the records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix typo [kafka]
birdoplank opened a new pull request, #15743: URL: https://github.com/apache/kafka/pull/15743 Testing potential security vulnerability in the pipeline to be reported as part of Apache vulnerability disclosure program: https://apache.org/security/#vulnerability-handling ### Committer Checklist (excluded from commit message) - [ X ] Verify design and implementation - [ X ] Verify test coverage and CI build status - [ X ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1568904632 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: It seems `cleaner.alterCheckpointDir` will do nothing if `sourceLog` is empty. Maybe we can revert those changes and run `alterCheckpointDir` only if `sourceLog` is defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1568897953 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: Updated it. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1568882032 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: I've refactored LogManager to use `replaceCurrentWithFutureLog` in [b87a21f](https://github.com/apache/kafka/pull/15136/commits/b87a21f3bd0eea6e4083a2d14b41053361f7b40a). PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1568883111 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -81,8 +82,7 @@ public BrokerNode build( logDataDirectories = Collections. singletonList(String.format("combined_%d", id)); } else { -logDataDirectories = Collections. -singletonList(String.format("broker_%d_data0", id)); +logDataDirectories = Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", id), String.format("broker_%d_data1", id))); Review Comment: Rebased to use the new method in the builder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16467) Add README to docs folder
[ https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838190#comment-17838190 ] ASF GitHub Bot commented on KAFKA-16467: FrankYang0529 commented on code in PR #596: URL: https://github.com/apache/kafka-site/pull/596#discussion_r1568848118 ## README.md: ## @@ -10,4 +10,32 @@ You can run it with the following command, note that it requires docker: Then you can open [localhost:8080](http://localhost:8080) on your browser and browse the documentation. -To kill the process, just type ctrl + c \ No newline at end of file +To kill the process, just type ctrl + c. + +## How to preview the latest documentation changes in Kafka repository? + +1. Generating document from kafka repository: + +```shell +# change directory into kafka repository +cd KAFKA_REPO +./gradlew clean siteDocTar +# supposing built with scala 2.13 +tar zxvf core/build/distributions/kafka_2.13-$(./gradlew properties | grep version: | awk '{print $NF}' | head -n 1)-site-docs.tgz +``` + +2. Copying the generated documents from Kafka repository into kafka-site, and preview them (note that it requires docker): + +```shell +# change directory into kafka-site repository +cd KAFKA_SITE_REPO +# copy the generated documents into dev folder +rm -rf dev +mkdir dev +# change directory into kafka repository +cp -r KAFKA_REPO/site-docs/* dev Review Comment: I remove `# change directory into kafka repository`. Thank you. > Add README to docs folder > - > > Key: KAFKA-16467 > URL: https://issues.apache.org/jira/browse/KAFKA-16467 > Project: Kafka > Issue Type: Improvement >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > We don't have a guide in project root folder or docs folder to show how to > run local website. It's good to provide a way to run document with kafka-site > repository. > > Option 1: Add links to wiki page > [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes] > and > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. > Option 2: Show how to run the document within container. For example: moving > `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]
chia7712 merged PR #15713: URL: https://github.com/apache/kafka/pull/15713 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]
chia7712 commented on PR #15713: URL: https://github.com/apache/kafka/pull/15713#issuecomment-2061259436 @AyoubOm thanks for your contribution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
chia7712 commented on code in PR #15722: URL: https://github.com/apache/kafka/pull/15722#discussion_r1568836486 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, String baseName, String action) { MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, -String.format("*Deprecated* The fraction of time the I/O thread spent %s", action), metricTags); +String.format("The fraction of time the I/O thread spent %s", action), metricTags); Review Comment: nice finding! Could you add comments to explain why we remove the `deprecated`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1568770827 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +final CompletableFuture futureToAwait = new CompletableFuture<>(); +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. Any errors in the pending async commit will be handled +// by the async commit future / the commit callback - here, we just want to wait for it to complete. +lastPendingAsyncCommit.whenComplete((v, t) -> futureToAwait.complete(null)); +if (!disableWakeup) { +wakeupTrigger.setActiveTask(futureToAwait); +} +ConsumerUtils.getResult(futureToAwait, timer); Review Comment: Yes. I think if `lastPendingAsyncCommit` is completed before entering here, the `whenComplete` will execute immediately. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest { public void resetAll() { backgroundEventQueue.clear(); if (consumer != null) { -consumer.close(Duration.ZERO); +try { +consumer.close(Duration.ZERO); +} catch (Exception e) { +assertInstanceOf(KafkaException.class, e); +} Review Comment: `resetAll` isn't supposed to test anything, so this also shouldn't mask anything. It's purely for cleanup. In this case, it only affects two tests that will timeout on close (since we don't mock an async commit response). So let me do it. But in general, I wonder if adding clean-up logic to the tests itself won't reduce readability/clarity of the actual test case. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: Yes. I agree, it's a bit weird, but mockito is recording those invocations with copies (references) of all arguments. That's also why spying on lots of objects in busy event loops will accumulate lots of such "invocation objects" in memory. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); Review Comment: I guess I may be less concerned with code duplication in test setup than my reviewers :). Done. Added another helper method that contains the lines up to the incomplete async commit. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerT
Re: [PR] MINOR: Various cleanups in connect [kafka]
chia7712 merged PR #15734: URL: https://github.com/apache/kafka/pull/15734 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1568737120 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); +ClassicGroup classicGroup; Review Comment: nit: Let's add an empty line before this one. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { Review Comment: Does it need to be public? Let's add some javadoc please. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9623,6 +10332,186 @@ public void testClassicGroupOnUnloadedCompletingRebalance() throws Exception { .setErrorCode(NOT_COORDINATOR.code()), pendingMemberSyncResult.syncFuture.get()); } +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testLastClassicProtocolMemberLeavingConsumerGroup(boolean appendLogSuccessfully) { Review Comment: Should we also test the session expiration path and the rebalance expiration path? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + +/** + * Create a new consumer group according to the given classic group. + * + * @param snapshotRegistry The SnapshotRegistry. + * @param metrics The GroupCoordinatorMetricsShard. + * @param classicGroup The converted classic group. + * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @return The created ConsumerGruop. + */ +public static ConsumerGroup fromClassicGroup( +SnapshotRegistry snapshotRegistry, +GroupCoordinatorMetricsShard metrics, +ClassicGroup classicGroup, +TopicsImage topicsImage +) { +String groupId = classicGroup.groupId(); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); +consumerGrou
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 merged PR #15684: URL: https://github.com/apache/kafka/pull/15684 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
mimaison commented on code in PR #15469: URL: https://github.com/apache/kafka/pull/15469#discussion_r1568692760 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -766,135 +852,23 @@ protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embed protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { Review Comment: Ah right, Parser is also part of the public API, I thought it was only a private inner class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]
AyoubOm commented on PR #15713: URL: https://github.com/apache/kafka/pull/15713#issuecomment-2060923661 @chia7712 please check this when you have time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
mimaison commented on code in PR #15469: URL: https://github.com/apache/kafka/pull/15469#discussion_r1568541184 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -766,135 +852,23 @@ protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embed protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { Review Comment: I wonder if it would make sense to move all these `parse<>()` methods to the `Parser` class, and extract `Parser` to its own file. WDYT? I made a quick attempt in https://github.com/apache/kafka/commit/10f4910bfc5e0d47782d7a70f8ad22dee97efe12#diff-024f49f1f6adf07bcc1cab6aa8caa0d931ba2c6be887d96ab575ae032be4d051 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -177,7 +213,12 @@ public static Long convertToLong(Schema schema, Object value) throws DataExcepti * @throws DataException if the value could not be converted to a float */ public static Float convertToFloat(Schema schema, Object value) throws DataException { Review Comment: A few of these `convertTo<>()` methods are not covered by unit tests. It's ok not to address this in this PR if you'd prefer as it's already huge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
gharris1727 commented on code in PR #15469: URL: https://github.com/apache/kafka/pull/15469#discussion_r1568679258 ## connect/api/src/main/java/org/apache/kafka/connect/data/Values.java: ## @@ -766,135 +852,23 @@ protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embed protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { Review Comment: That's a cool idea, I think that makes a lot of sense when a bunch of these static methods take a Parser argument anyway. Since this is in the public API, I'll focus on moving some of the internal methods to instance methods of a protected/package local Parser, and leave the public static methods in Values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]
chia7712 merged PR #15729: URL: https://github.com/apache/kafka/pull/15729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
chia7712 merged PR #15733: URL: https://github.com/apache/kafka/pull/15733 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
[ https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16298: -- Assignee: Lucas Brutschy (was: Kirk True) > Ensure user callbacks exceptions are propagated to the user on consumer poll > > > Key: KAFKA-16298 > URL: https://issues.apache.org/jira/browse/KAFKA-16298 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Blocker > Labels: callback, kip-848-client-support > Fix For: 3.8.0 > > > When user-defined callbacks fail with an exception, the expectation is that > the error should be propagated to the user as a KafkaExpception and break the > poll loop (behaviour in the legacy coordinator). The new consumer executes > callbacks in the application thread, and sends an event to the background > with the callback result and error if any, [passing the error along with the > event > here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] > to the background thread, but does not seem to propagate the exception to > the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lucasbru commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2060943337 @lianetm did you mean to closet the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on code in PR #15645: URL: https://github.com/apache/kafka/pull/15645#discussion_r1568506632 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class ConfigCommandIntegrationTest extends QuorumTestHarness { Review Comment: Yes. I struggle to make test work both for kraft and zk modes and found the bug from #15729 :) > Is is possible to use ClusterTestExtensions to rewrite this test? Will try to do it. I stoped on the step "Is it possible to apply same test cases for kraft mode" :) ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru commented on PR #15742: URL: https://github.com/apache/kafka/pull/15742#issuecomment-2060865659 @kirktrue @lianetm could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru opened a new pull request, #15742: URL: https://github.com/apache/kafka/pull/15742 When user-defined rebalance listeners fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new consumer executes callbacks in the application thread, and sends an event to the background with the callback result and error if any, [passing the error along with the event here](https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882) to the background thread, but does not seem to propagate the exception to the user. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1568482951 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new HashMa
Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
vamossagar12 commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-2060631731 hey @ableegoldman .. i am interested in wrapping this up but i haven't looked at the streams codebase for some time and things seemed to have changed a bit. I am not aware of the 3.8 timelines, will have to check those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1568383960 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + +updatedMembers.put("newMember", new AssignmentMemberSpec( +Optional.empty(), +rackId, +topicMetadata.keySet(), +Collections.emptyMap() +)
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1568385972 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SchemaUtil; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently, + * instead of multiple individual {@link SingleFieldPath single-field paths}. + * + * If the SMT requires accessing a single field on the same data object, + * use {@link SingleFieldPath} instead. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821 + * @see SingleFieldPath + * @see FieldSyntaxVersion + */ +public class MultiFieldPaths { +final Trie trie = new Trie(); + +MultiFieldPaths(Set paths) { +paths.forEach(trie::insert); +} + +public static MultiFieldPaths of(List fields, FieldSyntaxVersion syntaxVersion) { +return new MultiFieldPaths(fields.stream() +.map(f -> new SingleFieldPath(f, syntaxVersion)) +.collect(Collectors.toSet())); +} + +/** + * Find values at the field paths + * + * @param struct data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Struct struct) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(struct, trie.root, new HashMap<>()); +} + +private Map> findFieldAndValues( +Struct originalValue, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Field field = originalValue.schema().field(step.getKey()); +if (step.getValue().isLeaf()) { +Map.Entry fieldAndValue = +field != null +? new AbstractMap.SimpleImmutableEntry<>(field, originalValue.get(field)) +: null; +fieldAndValueMap.put(step.getValue().path, fieldAndValue); +} else { +if (field.schema().type() == Type.STRUCT) { +findFieldAndValues( +originalValue.getStruct(field.name()), +step.getValue(), +fieldAndValueMap +); +} +} +} +return fieldAndValueMap; +} + +/** + * Find values at the field paths + * + * @param value data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Map value) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(value, trie.root, new HashMap<>()); +} + +@SuppressWarnings("unchecked") +private Map> findFieldAndValues( +Map value, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Object fieldValue = value.get(step.getKey()); +if (step.getValue().isLeaf()) { +fieldAndValueMap.put( +step.getValue().path, +new AbstractMap.SimpleImmutableEntry<>(step.getKey(), fieldValue) +); +} else { +if (fieldValue instanceof Map) { +findFieldAndValues( +(Map) fieldValue, +step.getValue(), +fieldAndValueMap +
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
jeqo commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1568385072 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SchemaUtil; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently, + * instead of multiple individual {@link SingleFieldPath single-field paths}. + * + * If the SMT requires accessing a single field on the same data object, + * use {@link SingleFieldPath} instead. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821 + * @see SingleFieldPath + * @see FieldSyntaxVersion + */ +public class MultiFieldPaths { +final Trie trie = new Trie(); + +MultiFieldPaths(Set paths) { +paths.forEach(trie::insert); +} + +public static MultiFieldPaths of(List fields, FieldSyntaxVersion syntaxVersion) { +return new MultiFieldPaths(fields.stream() +.map(f -> new SingleFieldPath(f, syntaxVersion)) +.collect(Collectors.toSet())); +} + +/** + * Find values at the field paths + * + * @param struct data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Struct struct) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(struct, trie.root, new HashMap<>()); +} + +private Map> findFieldAndValues( +Struct originalValue, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Field field = originalValue.schema().field(step.getKey()); +if (step.getValue().isLeaf()) { +Map.Entry fieldAndValue = +field != null +? new AbstractMap.SimpleImmutableEntry<>(field, originalValue.get(field)) +: null; +fieldAndValueMap.put(step.getValue().path, fieldAndValue); +} else { +if (field.schema().type() == Type.STRUCT) { +findFieldAndValues( +originalValue.getStruct(field.name()), +step.getValue(), +fieldAndValueMap +); +} +} +} +return fieldAndValueMap; +} + +/** + * Find values at the field paths + * + * @param value data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Map value) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(value, trie.root, new HashMap<>()); +} + +@SuppressWarnings("unchecked") +private Map> findFieldAndValues( +Map value, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Object fieldValue = value.get(step.getKey()); +if (step.getValue().isLeaf()) { +fieldAndValueMap.put( +step.getValue().path, +new AbstractMap.SimpleImmutableEntry<>(step.getKey(), fieldValue) +); +} else { +if (fieldValue instanceof Map) { +findFieldAndValues( +(Map) fieldValue, +step.getValue(), +fieldAndValueMap +
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568327439 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > No, as you said above, the MigrationClientException retryHandler won't be triggered in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as pollEvent is keep polling, they can be retried later. you are right :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > My question is "why we did not handle UNINITIALIZED by another event"? If we move recoverMigrationStateFromZK to another event, we don't need to add extra retryHandler. That's a good quesiton, @chia7712 ! Let me think about it. > Also, the solution offered by this PR has a side effect that we will put 2 PollEvent if the exception MigrationClientException happens in other migrationState No, as you said above, the `MigrationClientException` retryHandler won't be triggered in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as `pollEvent` is keep polling, they can be retried later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > My question is "why we did not handle UNINITIALIZED by another event"? If we move recoverMigrationStateFromZK to another event, we don't need to add extra retryHandler. That's a good quesiton, @chia7712 ! Let me think about it. > Also, the solution offered by this PR has a side effect that we will put 2 PollEvent if the exception MigrationClientException happens in other migrationState No, as you said above, the `MigrationClientException` retry won't happen in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as `pollEvent` is keep polling, they can be retried later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: > My question is "why we did not handle UNINITIALIZED by another event"? If we move recoverMigrationStateFromZK to another event, we don't need to add extra retryHandler. That's a good quesiton, @chia7712 ! Let me think about it. > Also, the solution offered by this PR has a side effect that we will put 2 PollEvent if the exception MigrationClientException happens in other migrationState No, as you said above, the `MigrationClientException` won't happen in other migrationState because they will be handled in other event handler, which is not related to pollEvent. And because the default retryHandler is no-op, there will be no retry for other migrationStates. As long as `pollEvent` is keep polling, they can be retried later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org