Re: [PR] MINOR: Ensure that DisplayName is set in all parameterized tests [kafka]
dajac merged PR #14850: URL: https://github.com/apache/kafka/pull/14850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15061) CoordinatorPartitionWriter should reuse buffer
[ https://issues.apache.org/jira/browse/KAFKA-15061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15061. - Fix Version/s: 3.7.0 Resolution: Fixed > CoordinatorPartitionWriter should reuse buffer > -- > > Key: KAFKA-15061 > URL: https://issues.apache.org/jira/browse/KAFKA-15061 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Labels: kip-848-preview > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
dajac merged PR #14885: URL: https://github.com/apache/kafka/pull/14885 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]
dajac merged PR #14896: URL: https://github.com/apache/kafka/pull/14896 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable auto-tick in AsyncKafkaConsumerTest [kafka]
dajac merged PR #14915: URL: https://github.com/apache/kafka/pull/14915 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15972: Add support to exclude labels for telemetry metrics (KIP-714) [kafka]
apoorvmittal10 commented on PR #14924: URL: https://github.com/apache/kafka/pull/14924#issuecomment-1840118739 @AndrewJSchofield @mjsax @wcarlson5 @philipnee - Minor PR to exclude client `client_id` label in metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15972: Add support to exclude labels for telemetry metrics (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #14924: URL: https://github.com/apache/kafka/pull/14924 Changes in the PR are to support excluding `client_id` label when sending telemetry metrics. Some of the labels/tags which are present in metric should be skipped while collecting telemetry as data might already be known to broker hence, we should minimize the data transfer. One of such labels is client_id which is already present in RequestContext hence broker can append that label prior emitting metrics to telemetry backend. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15972) Add support to exclude labels in client telemetry
Apoorv Mittal created KAFKA-15972: - Summary: Add support to exclude labels in client telemetry Key: KAFKA-15972 URL: https://issues.apache.org/jira/browse/KAFKA-15972 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal Some of the labels/tags which are present in metric should be skipped while collecting telemetry as data might already be known to broker hence, we should minimize the data transfer. One of such labels is client_id which is already present in RequestContext hence broker can append that label prior emitting metrics to telemetry backend. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]
cmccabe merged PR #14918: URL: https://github.com/apache/kafka/pull/14918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9545: Fix Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` [kafka]
ashwinpankaj commented on PR #14910: URL: https://github.com/apache/kafka/pull/14910#issuecomment-1840012911 > LGTM Thanks @wcarlson5 !! - can you please merge this commit ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
pprovenzano commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1414875644 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { Review Comment: We should check if the Metadata version supports this request and return an error if not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15866:Refactor OffsetFetchRequestState Error handling [kafka]
DL1231 opened a new pull request, #14923: URL: https://github.com/apache/kafka/pull/14923 The PR resolve issue [KAFKA-15866](https://issues.apache.org/jira/browse/KAFKA-15866), the current OffsetFetchRequestState error handling uses nested if-else, which is quite different, stylistically, to the OffsetCommitRequestState using a switch statment. The latter is a bit more readable so we should refactor the error handling using the same style to improve readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Explain need to escape backslashes in rules [kafka]
github-actions[bot] commented on PR #14333: URL: https://github.com/apache/kafka/pull/14333#issuecomment-1839944197 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState
[ https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-15866: Assignee: Lan Ding > Refactor OffsetFetchRequestState Error handling to be more consistent with > OffsetCommitRequestState > --- > > Key: KAFKA-15866 > URL: https://issues.apache.org/jira/browse/KAFKA-15866 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Lan Ding >Priority: Minor > > The current OffsetFetchRequestState error handling uses nested if-else, which > is quite different, stylistically, to the OffsetCommitRequestState using a > switch statment. The latter is a bit more readable so we should refactor the > error handling using the same style to improve readability. > > A minor point: Some of the error handling seems inconsistent with the commit. > The logic was from the current implementation, so we should also review all > the error handling. For example: somehow the current logic doesn't mark the > coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]
showuon commented on code in PR #14832: URL: https://github.com/apache/kafka/pull/14832#discussion_r1414765630 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -283,8 +283,38 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf meter() } + case class GaugeWrapper(metricType: String, brokerTopicAggregatedMetric: BrokerTopicAggregatedMetric) { +@volatile private var lazyGauge: Gauge[Long] = _ +private val gaugeLock = new Object + +def gauge(): Gauge[Long] = { + var gauge = lazyGauge + if (gauge == null) { +gaugeLock synchronized { + gauge = lazyGauge + if (gauge == null) { +gauge = metricsGroup.newGauge(metricType, () => brokerTopicAggregatedMetric.value()) +lazyGauge = gauge + } +} + } + gauge +} + +def close(): Unit = gaugeLock synchronized { + if (lazyGauge != null) { +metricsGroup.removeMetric(metricType) +brokerTopicAggregatedMetric.close() +lazyGauge = null + } +} + +gauge() // greedily initialize the general topic metrics Review Comment: nit: I don't think we need comment here. ## core/src/test/scala/integration/kafka/api/MetricsTest.scala: ## @@ -320,16 +320,28 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}") } + private def fromNameToBrokerTopicStatsMBean(name: String): String = { +s"kafka.server:type=BrokerTopicMetrics,name=$name" + } + private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit = { val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name => KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => { metric._1.getMBeanName().equals(name.getMBeanName) }).isDefined ).toList +val aggregatedBrokerTopicStats = Set(BrokerTopicStats.RemoteCopyLagBytes) Review Comment: Why using set here? I guess we plan to add more metrics in this set in the future, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on PR #14843: URL: https://github.com/apache/kafka/pull/14843#issuecomment-1839895007 @mjsax Build passed with known flaky tests, not related to PR changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax opened a new pull request, #14922: URL: https://github.com/apache/kafka/pull/14922 This PR is on top of https://github.com/apache/kafka/pull/14908 Only second commit to be reviewed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414752935 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -183,6 +183,10 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } +public MembershipManager membershipManager() { +return membershipManager; +} Review Comment: It's for access by the `ApplicationEventProcessor` to process its events that relate to the state machine. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java: ## @@ -66,15 +66,15 @@ Exception invokePartitionsAssigned(final SortedSet assignedParti throw e; } catch (Exception e) { log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", -listener.getClass().getName(), assignedPartitions, e); +listener.get().getClass().getName(), assignedPartitions, e); Review Comment: Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414752641 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection topics, Optional + * Process background events, if any + * Briefly wait for {@link CompletableApplicationEvent an event} to complete + * + * + * + * + * Each iteration gives the application thread an opportunity to process background events, which may be + * necessary to complete the overall processing. + * + * + * + * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an + * {@link UnsubscribeApplicationEvent} on the application event queue. That event will eventually trigger the + * rebalancing logic in the background thread. Critically, as part of this rebalancing work, the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However, + * this callback must be executed on the application thread. To achieve this, the background thread enqueues a + * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is + * periodically queried by the application thread to see if there's work to be done. When the application thread + * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the + * background event queue. Moments later, the background thread will see that event, process it, and continue + * execution of the rebalancing logic. The rebalancing logic cannot complete until the + * {@link ConsumerRebalanceListener} callback is performed. + * + * @param event Event that contains a {@link CompletableFuture}; it is on this future that the application thread + * will wait for completion + * @param timer Overall timer that bounds how long the application thread will wait for the event to complete + * @return {@code true} if the event completed within the timeout, {@code false} otherwise + */ +private boolean processBackgroundEvents(CompletableApplicationEvent event, Timer timer) { +log.trace("Enqueuing event {} for processing; will wait up to {} ms to complete", event, timer.remainingMs()); + +do { +backgroundEventProcessor.process(); Review Comment: The `process()` method on `BackgroundEventProcessor` (and `ApplicationEventProcessor`) process _all_ the events in the queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839871090 @cmccabe thanks for having a look. I've made the following changes: * Replaced the use of `prepend` with `append` in `propagateDirectoryFailure` * Moved `communicationInFlight = true` after `_channelManager.sendRequest()` * Always set `nextSchedulingShouldBeImmediate = false` in `scheduleNextCommunicationAfterFailure` * Moved checking if `communicationInFlight = true` to `CommunicationEvent.run()` @junrao @cmccabe: please take another look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]
Owen-CH-Leung commented on PR #14891: URL: https://github.com/apache/kafka/pull/14891#issuecomment-1839853774 > Hm.. given this is the TopicCommandIntegrationTest I think we should keep the parts testing the topic command tool. Thanks for your feedback! Do you mean we should keep the existing API calls like `createAndWaitTopic(buildTopicCommandOptionsWithBootstrap( "--create", "--partitions", "2", "--replication-factor", "1", "--topic", testTopicName));` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414728861 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + if (manager.eventQueue.isEmpty) Review Comment: You're right, this is incorrect. We must only advance the time if the eventQueue has an event scheduled at a future time, unless the next event is a timeout event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]
lihaosky commented on code in PR #14714: URL: https://github.com/apache/kafka/pull/14714#discussion_r1414713123 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java: ## @@ -394,6 +394,106 @@ public void testDeterministic() { } } +@Test +public void testMaxFlowOnlySourceAndSink() { +final Graph graph1 = new Graph<>(); Review Comment: `graph` was used in setup ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java: ## @@ -88,55 +93,20 @@ public Graph constructTaskGraph( } } -// TODO: validate tasks in tasksForTopicGroup and taskIdList -final SortedMap> sortedTasksForTopicGroup = new TreeMap<>(tasksForTopicGroup); Review Comment: It's put in `validateTasks` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1414713239 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int, consumerId: String, offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, - transactionalId: String = null, producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { -// first filter out partitions with offset metadata size exceeding limit -val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) -} - group.inLock { if (!group.hasReceivedConsistentOffsetCommits) warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " + s"should be avoided.") } -val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID -// construct the message set to append +val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => + validateOffsetMetadataLength(offsetAndMetadata.metadata) +} if (filteredOffsetMetadata.isEmpty) { // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE } responseCallback(commitStatus) -} else { - getMagic(partitionFor(group.groupId)) match { -case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() - - val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => -val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicIdPartition.topicPartition) -val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion) -new SimpleRecord(timestamp, key, value) - } - val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) - val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) - - if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) -throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue) - - val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(), -producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH) - - records.foreach(builder.append) - val entries = Map(offsetTopicPartition -> builder.build()) - - // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { -// the append response should only contain the topics partition -if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition)) - throw new IllegalStateException("Append status %s should only have one partition %s" -.format(responseStatus, offsetTopicPartition)) - -// construct the commit response status and insert -// the offset and metadata to cache if the append status has no error -val status = responseStatus(offsetTopicPartition) - -val responseError = group.inLock { - if (status.error == Errors.NONE) { -if (!group.is(Dead)) { - filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => -if (isTxnOffsetCommit) - group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) -else - group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) - } -} - -// Record the number of offsets committed to the log -offsetCommitsSensor.record(records.size) - -
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1414713000 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ## @@ -347,7 +347,7 @@ private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, Load } metrics.updateLastAppliedImageProvenance(image.provenance()); metrics.setCurrentMetadataVersion(image.features().metadataVersion()); -if (uninitializedPublishers.isEmpty()) { +if (!uninitializedPublishers.isEmpty()) { Review Comment: Sigh. Looks like this was introduced in KAFKA-14538, which I reviewed. It really only bites in scenarios where you're running a pre-NO_OP record MV, which is rare these days. Thanks for the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]
gharris1727 commented on PR #14917: URL: https://github.com/apache/kafka/pull/14917#issuecomment-1839818405 I tested out adding a MiniTrogdorCluster start/stop in a `@BeforeAll` method in the test, and I'm getting reliable test passes at about 20% CPU, where before it was consistent failures. It appears that the first start of MiniTrogdorCluster takes ~110 seconds, while subsequent starts take ~2 seconds. That would explain the trend I was seeing in gradle enterprise, where one test has >30s runtime, while every other test has ~2s runtime. I would probably recommend increasing the timeout in this test to 4 minutes, or adding in the `@BeforeAll` warm-up method I mentioned, in lieu of disabling this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414711941 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: Ok. I wasn't sure that communicationInFlight is for both requests. Since `NodeToControllerChannelManagerImpl` uses 1 for `maxInFlightRequestsPerConnection`, another option is to add a `canSendRequest` api in `NodeToControllerChannelManager`, instead maintaining `communicationInFlight` here. `canSendRequest` api can be implemented through `networkClient.ready(node, now)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1414710963 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +356,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new DuplicateBrokerRegistrationException("Reserved directory ID in request"); Review Comment: Hmm. `DuplicateBrokerRegistrationException` is certainly the wrong thing, but I don't think `BrokerIdNotRegisteredException` is quite right either. That exception was intended for when you made an RPC that required broker N to be registered (like heartbeat) but broker N was not in fact registered. It wasn't intended for an error returned from the registration RPC itself. After all, it provides no information. OK the broker is not registered, but why? I can see that there are some lines above which are using `BrokerIdNotRegisteredException` as a generic "didn't register" error, but that doesn't seem quite right... ``` if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) { throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers."); } if (!request.isMigratingZkBroker() && featureControl.inPreMigrationMode()) { throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } ``` The "pre-migration" thing is just a straightforward logic error that should never happen. Maybe `RuntimeException` (and consequent controller failover) is OK for that. The `request.isMigratingZkBroker()` check probably deserves its own error code since it's a weird scenario that should have been given its own error code by the migration KIP. Not sure if it's worth doing now but... probably Your thing is more like INVALID_REQUEST, since someone is sending obvious nonsense which superficially resembles the schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1414710963 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +356,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new DuplicateBrokerRegistrationException("Reserved directory ID in request"); Review Comment: Hmm. `DuplicateBrokerRegistrationException` is certainly the wrong thing, but I don't think `BrokerIdNotRegisteredException` is quite right either. That exception was intended for when you made an RPC that required broker N to be registered (like heartbeat) but broker N was not in fact registered. It wasn't intended for an error returned from the registration RPC itself. After all, it provides no information. OK the broker is not registered, but why? I can see that there are some lines above which are using `BrokerIdNotRegisteredException` as a generic "didn't register" error, but that doesn't seem quite right... ``` if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) { throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers."); } if (!request.isMigratingZkBroker() && featureControl.inPreMigrationMode()) { throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } ``` The "pre-migration" thing is just a straightforward logic error that should never happen. The `request.isMigratingZkBroker()` check probably deserves its own error code since it's a weird scenario that should have been spelled out in the KIP. Your thing is more like INVALID_REQUEST, since someone is sending obvious nonsense which superficially resembles the schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1414710936 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -941,39 +857,129 @@ class ReplicaManager(val config: KafkaConfig, } } - private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard], - entriesPerPartition: Map[TopicPartition, MemoryRecords], - verifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords], - errorEntries: mutable.Map[TopicPartition, Errors]): Unit= { + private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, MemoryRecords], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit): Unit = { +// If required.acks is outside accepted range, something is wrong with the client +// Just return an error and don't handle the request at all +val responseStatus = entries.map { case (topicPartition, _) => + topicPartition -> new PartitionResponse( +Errors.INVALID_REQUIRED_ACKS, +LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset, +RecordBatch.NO_TIMESTAMP, +LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset + ) +} +responseCallback(responseStatus) + } + + /** + * Apply the postVerificationCallback asynchronously only after verifying the partitions have been added to the transaction. + * The postVerificationCallback takes the arguments of the requestLocal for the thread that will be doing the append as + * well as a mapping of topic partitions to LogAppendResult for the partitions that saw errors when verifying. + * + * This method will start the verification process for all the topicPartitions in entriesPerPartition and supply the + * postVerificationCallback to be run on a request handler thread when the response is received. + * + * @param entriesPerPartitionthe records per partition to be appended and therefore need verification + * @param transactionVerificationEntries the object that will store the entries to verify, the errors, and the verification guards + * @param transactionalIdthe id for the transaction + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method + * @param postVerificationCallback the method to be called when verification completes and the verification errors + * and the thread's RequestLocal are supplied + */ + def appendRecordsWithTransactionVerification(entriesPerPartition: Map[TopicPartition, MemoryRecords], + transactionVerificationEntries: TransactionVerificationEntries, + transactionalId: String, + requestLocal: RequestLocal, + postVerificationCallback: RequestLocal => Map[TopicPartition, LogAppendResult] => Unit): Unit = { +if (transactionalId != null && config.transactionPartitionVerificationEnable && addPartitionsToTxnManager.isDefined) + partitionEntriesForVerification(transactionVerificationEntries, entriesPerPartition) + +val onVerificationComplete: (RequestLocal, Map[TopicPartition, Errors]) => Unit = + executePostVerificationCallback( +transactionVerificationEntries, +postVerificationCallback, + ) + +if (transactionVerificationEntries.unverified.isEmpty) { + onVerificationComplete(requestLocal, transactionVerificationEntries.errors.toMap) +} else { + // For unverified entries, send a request to verify. When verified, the append process will proceed via the callback. + // We verify above that all partitions use the same producer ID. + val batchInfo = transactionVerificationEntries.unverified.head._2.firstBatch() + addPartitionsToTxnManager.foreach(_.verifyTransaction( +transactionalId = transactionalId, +producerId = batchInfo.producerId, +producerEpoch = batchInfo.producerEpoch, +topicPartitions = transactionVerificationEntries.unverified.keySet.toSeq, +callback = KafkaRequestHandler.wrapAsyncCallback(onVerificationComplete, requestLocal) + )) +} + } + + /** + * A helper method to compile the results from the transaction verification and call the postVerificationCallback. + * + * @param transactionVerificationEntries the object that will store the entries to verify, the errors, and the verification guards
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1414708109 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel, } } -if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) -else { - val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val transactionVerificationEntries = new ReplicaManager.TransactionVerificationEntries - // call the replica manager to append messages to the replicas +def postVerificationCallback(newRequestLocal: RequestLocal) Review Comment: The problem I had was that I needed to pass the errors and the verification guards into the postVerificationCallback, but wanted to minimize methods with a ton of arguments that do similar things. The naming starts to get confusing as well. An alternative I can think of is to create another method in ReplicaManager for the transaction produce path, but it will have all the append arguments and just call this same callback under the hood. I'm not sure I fully agree that the code was good before. I think it is better to keep append records clean and not incorporate transaction verification. That's why the transaction verification has the pre-verify (add to the txn manager) and post verification (tidy the results and put them in a form for the post verification callback). If we want to move this code out of KafkaApis, I think it only makes sense to make a separate method. The other stages need to remain generic for the group coordinator usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414708239 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: Since `NodeToControllerChannelManagerImpl` uses 1 for `maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in `NodeToControllerChannelManager`, instead networkClient.ready(node, now) ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: Since `NodeToControllerChannelManagerImpl` uses 1 for `maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in `NodeToControllerChannelManager`, instead networkClient.ready(node, now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1414708109 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel, } } -if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) -else { - val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val transactionVerificationEntries = new ReplicaManager.TransactionVerificationEntries - // call the replica manager to append messages to the replicas +def postVerificationCallback(newRequestLocal: RequestLocal) Review Comment: The problem I had was that I needed to pass the errors and the verification guards into the postVerificationCallback. The alternative is to create yet another method in ReplicaManager for the transaction produce path. I was hoping to avoid many methods with 7+ arguments that look similar but if we really want to move out of Kafka Apis, we can do that. I'm not sure I fully agree that the code was good before. I think it is better to keep append records clean and not incorporate transaction verification. That's why the transaction verification has the pre-verify (add to the txn manager) and post verification (tidy the results and put them in a form for the post verification callback). If we want to move this code out of KafkaApis, I think it only makes sense to make a separate method. The other stages need to remain generic for the group coordinator usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1414705968 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -284,7 +286,7 @@ ReplicaPlacer replicaPlacer() { public void activate() { heartbeatManager = new BrokerHeartbeatManager(logContext, time, sessionTimeoutNs); for (BrokerRegistration registration : brokerRegistrations.values()) { -heartbeatManager.touch(registration.id(), registration.fenced(), -1); +heartbeatManager.register(registration.id(), registration.fenced(), registration.directories()); Review Comment: I don't see why we need to handle directories in `HeartbeatManager`. I made this comment in the other PR too. HBM is about timing out brokers. It's not about JBOD stuff really. Directories are hard state so they should be kept in `ClusterControlManager`. And we need to issue a new `BrokerRegistrationRecord` when they change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1414704931 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -121,8 +122,38 @@ public static Map createAssignmentMap(int[] replicas, Uuid[] dire * Create an array with the specified number of entries set to {@link #UNASSIGNED}. */ public static Uuid[] unassignedArray(int length) { +return array(length, UNASSIGNED); +} + +/** + * Create an array with the specified number of entries set to {@link #MIGRATING}. + */ +public static Uuid[] migratingArray(int length) { +return array(length, MIGRATING); +} + +/** + * Create an array with the specified number of entries set to the specified value. + */ +private static Uuid[] array(int length, Uuid value) { Uuid[] array = new Uuid[length]; -Arrays.fill(array, UNASSIGNED); +Arrays.fill(array, value); return array; } + +/** + * Check if a directory is online, given a sorted list of online directories. + * @param dir The directory to check + * @param sortedOnlineDirs The sorted list of online directories + * @return true if the directory is considered online, false otherwise + */ +public static boolean isOnline(Uuid dir, List sortedOnlineDirs) { +if (UNASSIGNED.equals(dir) || MIGRATING.equals(dir)) { Review Comment: > If you mean zk->kraft migration, then I don't think so. Metadata records may have been persisted in an older version, without directory assignment yes, agreed. @pprovenzano , `MIGRATING` state can happen outside ZK migration. Honestly, I think `MIGRATING` isn't a great name. Someone who sticks on metadata version 15 won't ever "migrate" to having directory IDs. Maybe calling this `UNKNOWN` would be more descriptive? But that's another discussion I guess :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414704903 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. Review Comment: Hmm, if that's the case, should we set communicationInFlight to true when sending the very first Registration request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414703024 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: Sorry for so many comments here -- but is it correct we don't have the code to add/remove them on commit/abort yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
cmccabe commented on PR #14902: URL: https://github.com/apache/kafka/pull/14902#issuecomment-1839805443 I don't see why `BrokerHeartbeatManager` needs to know about directories at all. Its function is to identify brokers that haven't hearbeated in a while, so they can be fenced. But that doesn't intersect with what we care about here (directory lists). Directory lists should be handled by `ClusterControlManager`. They are hard state, after all, not soft state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
cmccabe commented on code in PR #14902: URL: https://github.com/apache/kafka/pull/14902#discussion_r1414698623 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -91,8 +102,15 @@ static class BrokerHeartbeatState { */ private BrokerHeartbeatState next; -BrokerHeartbeatState(int id) { +BrokerHeartbeatState(int id, List directories) { Review Comment: I think we should just require people to pass a list here. It's easy enough to pass `Collections.emptyList` if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
cmccabe commented on code in PR #14902: URL: https://github.com/apache/kafka/pull/14902#discussion_r1414699020 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -107,6 +125,13 @@ int id() { return id; } +/** + * Returns the sorted directories which the broker currently has available + */ +List directories() { Review Comment: Seems a bit awkward to have the field named differently than its accessor. Can we just call them both `directories` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
cmccabe commented on code in PR #14902: URL: https://github.com/apache/kafka/pull/14902#discussion_r1414698623 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -91,8 +102,15 @@ static class BrokerHeartbeatState { */ private BrokerHeartbeatState next; -BrokerHeartbeatState(int id) { +BrokerHeartbeatState(int id, List directories) { Review Comment: I think we should just require people to pass a list here. It's easy enough to pass `Collections.emptyList` if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414697270 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -187,8 +190,6 @@ public CoordinatorResult commitOffset( short version, OffsetCommitRequestData request ) { -snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); Review Comment: What's the context behind removing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
cmccabe commented on code in PR #14902: URL: https://github.com/apache/kafka/pull/14902#discussion_r1414696645 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -1070,11 +1070,27 @@ class ControllerApis( } def handleAssignReplicasToDirs(request: RequestChannel.Request): CompletableFuture[Unit] = { +authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) val assignReplicasToDirsRequest = request.body[AssignReplicasToDirsRequest] - -// TODO KAFKA-15426 -requestHelper.sendMaybeThrottle(request, - assignReplicasToDirsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val context = new ControllerRequestContext(request.context.header.data, request.context.principal, + OptionalLong.empty()) +controller.assignReplicasToDirs(context, assignReplicasToDirsRequest.data).handle[Unit] { (reply, e) => Review Comment: It's not necessary to use `handle` here. You can use `thenApply`. If it returns a future which was completed exceptionally, it will be handled in `ControllerApis.handle` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694924 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -675,25 +675,25 @@ public void testValidateOffsetCommit() { // Simulate a call from the admin client without member id and member epoch. // This should pass only if the group is empty. -group.validateOffsetCommit("", "", -1); +group.validateOffsetCommit("", "", -1, false); Review Comment: Do we want to have a test where this argument is true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694054 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -293,7 +326,11 @@ public void testReplayOffsetCommit() { new ApiMessageAndVersion(value, (short) 0) )); -verify(offsetMetadataManager, times(2)).replay(key, value); +verify(offsetMetadataManager, times(2)).replay( +RecordBatch.NO_PRODUCER_ID, Review Comment: should we verify something with a producer ID here? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -293,7 +326,11 @@ public void testReplayOffsetCommit() { new ApiMessageAndVersion(value, (short) 0) )); -verify(offsetMetadataManager, times(2)).replay(key, value); +verify(offsetMetadataManager, times(2)).replay( +RecordBatch.NO_PRODUCER_ID, Review Comment: should we verify something with a producer ID here in this file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1414691022 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * A callback that will be invoked after registering the changelogs for each state store in a standby + * task. It is guaranteed to always be invoked before any records are loaded into the standby store. + * + * @param topicPartition the changelog TopicPartition for this standby task + * @param storeName the name of the store being loaded + * @param startingOffset the offset from which the standby task begins consuming from the changelog + */ +void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset); + +/** + * Method called after loading a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * + * This method is called after loading each batch and it is advised to keep processing to a minimum. + * Any heavy processing will block the state updater thread and slow down the rate of standby task + * loading. Therefore, if you need to do any extended processing or connect to an external service, + * consider doing so asynchronously. + * + * @param topicPartition the TopicPartition containing the values to restore + * @param storeName the name of the store undergoing restoration + * @param batchEndOffset batchEndOffset the changelog end offset (inclusive) of the batch that was just loaded + * @param batchSize the total number of records in the batch that was just loaded + * @param currentEndOffset the current end offset of the changelog topic partition. + */ +void onBatchLoaded(final TopicPartition topicPartition, + final String storeName, + final TaskId taskId, + final long batchEndOffset, + final long batchSize, + final long currentEndOffset); + +/** + * This method is called when the corresponding standby task stops updating, for the provided reason. + * + * If the task was {@code MIGRATED} to another instance, this callback will be invoked after this + * state store (and the task itself) are closed (in which case the data will be cleaned up after + * state.cleanup.delay.ms). + * If the task was {@code PROMOTED} to an active task, the state store will not be closed, and the + * callback will be invoked after unregistering it as a standby task but before re-registering it as an active task + * and beginning restoration. In other words, this will always called before the corresponding + * {@link StateRestoreListener#onRestoreStart} call is made. + * + * @param topicPartition the TopicPartition containing the values to restore + * @param storeName the name of the store undergoing restoration Review Comment: ```suggestion * @param topicPartition the changelog TopicPartition for this standby task * @param storeName the name of the store being loaded ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690648 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -1615,4 +1618,127 @@ public void testDeleteGroupsWhenNotStarted() throws ExecutionException, Interrup future.get() ); } + +@Test +public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionException, InterruptedException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime, +new GroupCoordinatorMetrics() +); + +TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData() +.setGroupId("foo") +.setTransactionalId("transactional-id") +.setMemberId("member-id") +.setGenerationId(10) +.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() +.setPartitionIndex(0) +.setCommittedOffset(100); + +CompletableFuture future = service.commitTransactionalOffsets( +requestContext(ApiKeys.TXN_OFFSET_COMMIT), +request, +BufferSupplier.NO_CACHING +); + +assertEquals( +new TxnOffsetCommitResponseData() +.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() +.setName("topic") +.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() +.setPartitionIndex(0) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), +future.get() +); +} + +@ParameterizedTest +@NullSource Review Comment: Does this mean we test null string and empty string? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690195 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -833,13 +835,15 @@ public void validateOffsetCommit( if (generationId != this.generationId) { throw Errors.ILLEGAL_GENERATION.exception(); } -} else if (!isInState(EMPTY)) { +} else if (!isTransactional && !isInState(EMPTY)) { // If the request does not contain the member id and the generation id (version 0), // offset commits are only accepted when the group is empty. +// This does not apply to transactional offset commits, since the older versions +// of this protocol do not require member id and generation id. throw Errors.UNKNOWN_MEMBER_ID.exception(); } -if (isInState(COMPLETING_REBALANCE)) { +if (!isTransactional && isInState(COMPLETING_REBALANCE)) { Review Comment: Does the comment above also apply to this case with respect to transactional offset commits? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]
mjsax commented on code in PR #14714: URL: https://github.com/apache/kafka/pull/14714#discussion_r1414683411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java: ## @@ -97,6 +98,17 @@ private int getCost(final TaskId taskId, final UUID processId, final boolean inC return 1; } +@Test +public void testSubtopicShouldContainAllTasks() { Review Comment: `Subtopology` instead of `Subtopic` ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java: ## @@ -88,55 +93,20 @@ public Graph constructTaskGraph( } } -// TODO: validate tasks in tasksForTopicGroup and taskIdList -final SortedMap> sortedTasksForTopicGroup = new TreeMap<>(tasksForTopicGroup); Review Comment: Was this code only moved, or altered? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ## @@ -676,18 +676,28 @@ static SortedMap> getTaskTopicPartitionMap(final int return taskTopicPartitionMap; } -static Map configProps(final boolean enableRackAwareAssignor) { -return configProps(enableRackAwareAssignor, 0); +static Map> getTasksForTopicGroup(final int tpSize, final int partitionSize) { +final Map> tasksForTopicGroup = new HashMap<>(); +for (int i = 0; i < tpSize; i++) { +for (int j = 0; j < partitionSize; j++) { +final Subtopology subtopology = new Subtopology(i, null); +tasksForTopicGroup.computeIfAbsent(subtopology, k -> new HashSet<>()).add(new TaskId(i, j)); +} +} +return tasksForTopicGroup; +} + +static Map configProps(final String rackAwareConfig) { +return configProps(rackAwareConfig, 0); } -static Map configProps(final boolean enableRackAwareAssignor, final int replicaNum) { +static Map configProps(final String rackAwareConfig, final int replicaNum) { final Map configurationMap = new HashMap<>(); configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT); configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, replicaNum); -if (enableRackAwareAssignor) { - configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); -} +// configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); Review Comment: Needs some cleanup ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java: ## @@ -394,6 +394,106 @@ public void testDeterministic() { } } +@Test +public void testMaxFlowOnlySourceAndSink() { +final Graph graph1 = new Graph<>(); Review Comment: Nit: Why `graph1` but not `graph`? (similar below) ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTasksForTopicGroup; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
junrao commented on PR #14811: URL: https://github.com/apache/kafka/pull/14811#issuecomment-1839774186 Just merged a related PR https://github.com/apache/kafka/pull/14767. Re-triggering the tests to make sure there are no new issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
junrao closed pull request #14811: KAFKA-15831: KIP-1000 protocol and admin client URL: https://github.com/apache/kafka/pull/14811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
artemlivshits commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r141457 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel, } } -if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) -else { - val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID +val transactionVerificationEntries = new ReplicaManager.TransactionVerificationEntries - // call the replica manager to append messages to the replicas +def postVerificationCallback(newRequestLocal: RequestLocal) Review Comment: Do we need to change this file? I think we could preserve the abstraction and keep the verification guts encapsulated in the async call (as it is now), and only let the group coordinator use the explicit stages. I.e. replicaManager.appendRecords would do what it does now (under the covers call the stages), then we don't have to bring the complexity into the code that works well with the abstraction. ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int, consumerId: String, offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, - transactionalId: String = null, producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { -// first filter out partitions with offset metadata size exceeding limit -val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) -} - group.inLock { if (!group.hasReceivedConsistentOffsetCommits) warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " + s"should be avoided.") } -val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID -// construct the message set to append +val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => + validateOffsetMetadataLength(offsetAndMetadata.metadata) +} if (filteredOffsetMetadata.isEmpty) { // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE } responseCallback(commitStatus) -} else { - getMagic(partitionFor(group.groupId)) match { -case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() - - val records = filteredOffsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => -val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicIdPartition.topicPartition) -val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion) -new SimpleRecord(timestamp, key, value) - } - val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) - val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) - - if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) -throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to make a transaction offset commit with an invalid magic: " + magicValue) - - val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(), -producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH) - - records.foreach(builder.append) - val entries = Map(offsetTopicPartition -> builder.build()) - - // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { -// the append response should only contain the topics partition -if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition)) - throw new IllegalStateException("Append status %s
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
junrao merged PR #14767: URL: https://github.com/apache/kafka/pull/14767 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
cmccabe commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839768073 > Instead, CommunicationEvent.run() should gracefully handle running when communicationInFlight = true Probably by doing nothing. This also implies that `scheduleNextCommunicationAfterSuccess` should be checking `nextSchedulingShouldBeImmediate` (not sure why it doesn't already) Probably `nextSchedulingShouldBeImmediate` should be renamed to something like "dirty" since it doesn't actually mean the next scheduling should always be immediate. Like if there has been a communication error (`scheduleNextCommunicationAfterFailure`) we do not want to immediately reschedule, no matter what. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
junrao commented on PR #14767: URL: https://github.com/apache/kafka/pull/14767#issuecomment-1839767052 @apoorvmittal10 : Thanks for triaging the tests. Merging the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1414568376 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.VersionedRecordIterator; +import org.rocksdb.Snapshot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +public class LogicalSegmentIterator implements VersionedRecordIterator { +protected final ListIterator segmentIterator; +private final Bytes key; +private final Long fromTime; +private final Long toTime; +private final ResultOrder order; +private ListIterator> iterator; +private volatile boolean open = true; + +// defined for creating/releasing the snapshot. +private LogicalKeyValueSegment snapshotOwner = null; +private Snapshot snapshot = null; + + + +public LogicalSegmentIterator(final ListIterator segmentIterator, + final Bytes key, + final Long fromTime, + final Long toTime, + final ResultOrder order) { + +this.segmentIterator = segmentIterator; +this.key = key; +this.fromTime = fromTime; +this.toTime = toTime; +this.iterator = Collections.emptyListIterator(); +this.order = order; +} + +@Override +public void close() { +open = false; +// user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet! +releaseSnapshot(); +} + +@Override +public boolean hasNext() { +if (!open) { +throw new IllegalStateException("The iterator is out of scope."); +} +final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); Review Comment: We might was to add a comment about why `ASCENDING` is using `hasPrevious()` -- it's not intuitive and only makes sense if one knows that data is storing descending inside a segement. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -263,6 +268,31 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } +@SuppressWarnings("unchecked") +VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) { +validateStoreOpen(); + +if (toTimestamp < observedStreamTime - historyRetention) { +// history retention exceeded. we still check the latest value store in case the +// latest record version satisfies the timestamp bound, in which case it should +// still be returned (i.e., the latest record version per key never expires). +return new LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(), key, fromTimestamp, toTimestamp, order); +} else { +final List segments = new ArrayList<>(); +// add segment stores +// consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp} +// but is still valid in query specified time interval. +if (order.equals(ResultOrder.ASCENDING)) { +segments.addAll(segmentStores.segments(Long.MIN_VALUE, toTimestamp, true)); Review Comment: Given we pass in `MIN_VALUE` twice, could we omit the first parameter? ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ## @@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer key,
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
cmccabe commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839747217 Thanks for looking at this, @soarez . I don't think `nextSchedulingShouldBeImmediate` should be reset to false until the `CommunicationEvent` is run. I also think we should be a bit more careful about setting `communicationInFlight` ... it should only be set after `_channelManager.sendRequest` has been called. And even that call should probably be wrapped in a `try...catch`. If `sendRequest` throws, we don't want to be stuck never sending another request. (I don't think sendRequest is supposed to throw, but we should be careful.) I don't think prepend is needed anywhere. To be honest, the presence of `prepend` usually indicates a bug... :) Instead, `CommunicationEvent.run()` should gracefully handle running when `communicationInFlight = true` Probably by doing nothing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414647339 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. Review Comment: It applies to both Heartbeat and Registration requests which are the only requests this manager sends. I think we're also fixing a bug were a second registration request could have been scheduled after a call to `propagateDirectoryFailure` and before the registration response is received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414633539 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -453,79 +490,73 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - // this response handler is not invoked from the event handler thread, - // and processing a successful heartbeat response requires updating - // state, so to continue we need to schedule an event - eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data())) + val responseData = message.data() + failedAttempts = 0 + _state match { +case BrokerState.STARTING => + if (responseData.isCaughtUp) { +info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") +_state = BrokerState.RECOVERY +initialCatchUpFuture.complete(null) + } else { +debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") + } + // Schedule the heartbeat after only 10 ms so that in the case where + // there is no recovery work to be done, we start up a bit quicker. + scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) +case BrokerState.RECOVERY => + if (!responseData.isFenced) { +info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") +initialUnfenceFuture.complete(null) +_state = BrokerState.RUNNING + } else { +info(s"The broker is in RECOVERY.") + } + scheduleNextCommunicationAfterSuccess() +case BrokerState.RUNNING => + debug(s"The broker is RUNNING. Processing heartbeat response.") + scheduleNextCommunicationAfterSuccess() +case BrokerState.PENDING_CONTROLLED_SHUTDOWN => + if (!responseData.shouldShutDown()) { +info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " + + "for the active controller.") +if (!gotControlledShutdownResponse) { + // If this is the first pending controlled shutdown response we got, + // schedule our next heartbeat a little bit sooner than we usually would. + // In the case where controlled shutdown completes quickly, this will + // speed things up a little bit. + scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS)) +} else { + scheduleNextCommunicationAfterSuccess() +} + } else { +info(s"The controller has asked us to exit controlled shutdown.") +beginShutdown() + } + gotControlledShutdownResponse = true +case BrokerState.SHUTTING_DOWN => + info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.") +case _ => + error(s"Unexpected broker state ${_state}") + scheduleNextCommunicationAfterSuccess() + } } else { warn(s"Broker $nodeId sent a heartbeat request but received error $errorCode.") scheduleNextCommunicationAfterFailure() } } } - -override def onTimeout(): Unit = { - info("Unable to send a heartbeat because the RPC got timed out before it could be sent.") - scheduleNextCommunicationAfterFailure() -} } - private class BrokerHeartbeatResponseEvent(response: BrokerHeartbeatResponseData) extends EventQueue.Event { -override def run(): Unit = { - failedAttempts = 0 - _state match { -case BrokerState.STARTING => - if (response.isCaughtUp) { -info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") -_state = BrokerState.RECOVERY -initialCatchUpFuture.complete(null) - } else { -debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") - } - // Schedule the heartbeat after only 10 ms so that in the case where - // there is no recovery work to be done, we start up a bit quicker. - scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) -case BrokerState.RECOVERY => - if (!response.isFenced) { -info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") -initialUnfenceFuture.complete(null) -_state = BrokerState.RUNNING - } else { -info(s"The broker is in RECOVERY.") - }
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414627867 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: This comment also confused me because we also use the same nested 3 data structures to store the committed offsets it seems. So we have offsetsByGroup -> all committed offsets where we don't care about producer ID pendingTransactionalOffsets -> a mapping of producer id -> offsetsByGroup (for only that producer) for all uncommitted txn offsets On abort we remove the producer id's entries, and on commit we add them to offsetsByGroup. Is this correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414627626 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection topics, Optional + * Process background events, if any + * Briefly wait for {@link CompletableApplicationEvent an event} to complete + * + * + * + * + * Each iteration gives the application thread an opportunity to process background events, which may be + * necessary to complete the overall processing. + * + * + * + * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an + * {@link UnsubscribeApplicationEvent} on the application event queue. That event will eventually trigger the + * rebalancing logic in the background thread. Critically, as part of this rebalancing work, the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However, + * this callback must be executed on the application thread. To achieve this, the background thread enqueues a + * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is + * periodically queried by the application thread to see if there's work to be done. When the application thread + * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the + * background event queue. Moments later, the background thread will see that event, process it, and continue + * execution of the rebalancing logic. The rebalancing logic cannot complete until the + * {@link ConsumerRebalanceListener} callback is performed. + * + * @param event Event that contains a {@link CompletableFuture}; it is on this future that the application thread + * will wait for completion + * @param timer Overall timer that bounds how long the application thread will wait for the event to complete + * @return {@code true} if the event completed within the timeout, {@code false} otherwise + */ +private boolean processBackgroundEvents(CompletableApplicationEvent event, Timer timer) { +log.trace("Enqueuing event {} for processing; will wait up to {} ms to complete", event, timer.remainingMs()); + +do { +backgroundEventProcessor.process(); + +try { +Timer pollInterval = time.timer(100L); +log.trace("Waiting {} ms for event {} to complete", event, pollInterval.remainingMs()); +ConsumerUtils.getResult(event.future(), pollInterval); +log.trace("Event {} completed successfully", event); +return true; +} catch (TimeoutException e) { +// Ignore this as we will retry the event until the timeout expires. +} finally { +timer.update(time.milliseconds()); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414626881 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: I don't think so. We set `communicationInFlight = true` in `CommunicationEvent.run`, which sends both heartbeats but also registration requests. Maybe I'm missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414626549 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + +/** + * This class just provides a static name for the methods in the {@link ConsumerRebalanceListener} interface + * for a bit more compile time assurance. + */ +public enum ConsumerRebalanceListenerMethodName { + +onPartitionsRevoked, onPartitionsAssigned, onPartitionsLost; Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414625883 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -650,7 +693,7 @@ boolean reconcile() { "\tCurrent owned partitions: {}\n" + "\tAdded partitions (assigned - owned): {}\n" + "\tRevoked partitions (owned - assigned): {}\n", -assignedTopicIdPartitions, +assignedTopicPartitions, Review Comment: I've reverted the change to use `assignedTopicPartitions` instead of `assignedTopicIdPartitions`. We can take that up later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414621182 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This + * structure holds all the transactional offsets that are part of ongoing transactions. + * When the transaction is committed, they are transferred to the offsetsByGroup; when + * the transaction is aborted, they are removed. + */ +private final TimelineHashMap pendingTransactionalOffsets; + +private class Offsets { +private final TimelineHashMap>> offsetsByGroup; + +private Offsets() { +this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); +} + +private OffsetAndMetadata get( +String groupId, +String topic, +int partition +) { +TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId); +if (topicOffsets == null) { +return null; +} else { +TimelineHashMap partitionOffsets = topicOffsets.get(topic); +if (partitionOffsets == null) { +return null; +} else { +return partitionOffsets.get(partition); +} +} +} + +private OffsetAndMetadata put( +String groupId, +String topic, +int partition, +OffsetAndMetadata offsetAndMetadata +) { +TimelineHashMap> topicOffsets = offsetsByGroup +.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); +TimelineHashMap partitionOffsets = topicOffsets +.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); +return partitionOffsets.put(partition, offsetAndMetadata); +} + +private OffsetAndMetadata remove( +String groupId, +String topic, +int partition +) { +TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId); +if (topicOffsets == null) +return null; + +TimelineHashMap partitionOffsets = topicOffsets.get(topic); +if (partitionOffsets == null) +return null; + +OffsetAndMetadata removedValue = partitionOffsets.remove(partition); + +if (partitionOffsets.isEmpty()) Review Comment: Was about to ask if we remove the nested maps, but now I see we do so here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-6968: Adds calls to listener on rebalance of MockConsumer [kafka]
efgpinto closed pull request #7539: KAFKA-6968: Adds calls to listener on rebalance of MockConsumer URL: https://github.com/apache/kafka/pull/7539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6968) Call RebalanceListener in MockConsumer
[ https://issues.apache.org/jira/browse/KAFKA-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Pinto reassigned KAFKA-6968: Assignee: (was: Eduardo Pinto) > Call RebalanceListener in MockConsumer > -- > > Key: KAFKA-6968 > URL: https://issues.apache.org/jira/browse/KAFKA-6968 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 1.1.0 >Reporter: Andras Beni >Priority: Minor > > {{org.apache.kafka.clients.consumer.MockConsumer}} simulates rebalance with > method {{public synchronized void rebalance(Collection > newAssignment)}}. This method does not call {{ConsumerRebalanceListener}} > methods. Calls to {{onPartitionsRevoked(...)}} and > {{onPartitionsAssigned(...)}} should be added in appropriate order. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-6968) Call RebalanceListener in MockConsumer
[ https://issues.apache.org/jira/browse/KAFKA-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793049#comment-17793049 ] Eduardo Pinto commented on KAFKA-6968: -- Unassigning myself due to lack of bandwidth to conclude this. Not sure if this is still relevant anyway. > Call RebalanceListener in MockConsumer > -- > > Key: KAFKA-6968 > URL: https://issues.apache.org/jira/browse/KAFKA-6968 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 1.1.0 >Reporter: Andras Beni >Priority: Minor > > {{org.apache.kafka.clients.consumer.MockConsumer}} simulates rebalance with > method {{public synchronized void rebalance(Collection > newAssignment)}}. This method does not call {{ConsumerRebalanceListener}} > methods. Calls to {{onPartitionsRevoked(...)}} and > {{onPartitionsAssigned(...)}} should be added in appropriate order. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414610034 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: This comment about the keys is a little confusing. Looking at the code we have 4 maps that are nested within each other. Maybe the comment could mention that each of these is a separate map? (Or we could include a non java doc comment in offsets to make it clearer. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: This comment about the keys is a little confusing. Looking at the code we have 4 maps that are nested within each other. Maybe the comment could mention that each of these is a separate map? (Or we could include a non java doc comment in offsets to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9631: Fix support for optional fields in MockAdminClient [kafka]
efgpinto closed pull request #8430: KAFKA-9631: Fix support for optional fields in MockAdminClient URL: https://github.com/apache/kafka/pull/8430 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9631: Fix support for optional fields in MockAdminClient [kafka]
efgpinto commented on PR #8430: URL: https://github.com/apache/kafka/pull/8430#issuecomment-1839682362 No bandwidth to revisit this. Closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9631) MockAdminClient doesn't handle CreateTopics optional fields
[ https://issues.apache.org/jira/browse/KAFKA-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793048#comment-17793048 ] Eduardo Pinto commented on KAFKA-9631: -- I don't have the capacity to work on this and don't even know if this is still relevant so I've unsigned myself and will close the PR. > MockAdminClient doesn't handle CreateTopics optional fields > --- > > Key: KAFKA-9631 > URL: https://issues.apache.org/jira/browse/KAFKA-9631 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Priority: Major > > AdminClient's {{createTopics()}} method has a variant with two optional > fields. So I'd expect the following code to work correctly: > {{admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, > Optional.empty(), Optional.empty(}} > Indeed it works great, as long as we are using the real KafkaAdminClient. > MockKafkaAdminClient tries to get number of replicas without checking that > the values make sense , and therefore it fails with: > {{java.lang.IllegalArgumentException: Illegal Capacity: -1}}{{at > java.base/java.util.ArrayList.(ArrayList.java:158)}} > {{ at > org.apache.kafka.clients.admin.MockAdminClient.createTopics(MockAdminClient.java:183)}} > {{ at org.apache.kafka.clients.admin.Admin.createTopics(Admin.java:125)}} > Making a mockery of the mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-9631) MockAdminClient doesn't handle CreateTopics optional fields
[ https://issues.apache.org/jira/browse/KAFKA-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduardo Pinto reassigned KAFKA-9631: Assignee: (was: Eduardo Pinto) > MockAdminClient doesn't handle CreateTopics optional fields > --- > > Key: KAFKA-9631 > URL: https://issues.apache.org/jira/browse/KAFKA-9631 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Priority: Major > > AdminClient's {{createTopics()}} method has a variant with two optional > fields. So I'd expect the following code to work correctly: > {{admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, > Optional.empty(), Optional.empty(}} > Indeed it works great, as long as we are using the real KafkaAdminClient. > MockKafkaAdminClient tries to get number of replicas without checking that > the values make sense , and therefore it fails with: > {{java.lang.IllegalArgumentException: Illegal Capacity: -1}}{{at > java.base/java.util.ArrayList.(ArrayList.java:158)}} > {{ at > org.apache.kafka.clients.admin.MockAdminClient.createTopics(MockAdminClient.java:183)}} > {{ at org.apache.kafka.clients.admin.Admin.createTopics(Admin.java:125)}} > Making a mockery of the mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]
gharris1727 commented on code in PR #14293: URL: https://github.com/apache/kafka/pull/14293#discussion_r1414605900 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -314,6 +356,23 @@ private void awaitConnectorTasksStart(final MirrorMa }, MM_START_UP_TIMEOUT_MS, "Tasks for connector " + clazz.getSimpleName() + " for MirrorMaker instances did not transition to running in time"); } +private void awaitConnectorConfiguration(MirrorMaker mm, Class clazz, SourceAndTarget sourceAndTarget, Predicate> predicate) throws InterruptedException { +String connName = clazz.getSimpleName(); +waitForCondition(() -> { +try { +FutureCallback>> cb = new FutureCallback<>(); +herder(mm, sourceAndTarget).tasksConfig(connName, cb); Review Comment: I think I was doing a sort of "end-to-end-test" that verified that both the new configuration got applied to the connector, and that internal forwarding took place for the task configurations. I realize now that the method name doesn't match this implementation, so i renamed it to match. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]
cmccabe commented on PR #14918: URL: https://github.com/apache/kafka/pull/14918#issuecomment-1839616178 > Thanks @cmccabe. Is the impact of this bug that we weren't reporting the two metrics LatestSnapshotGeneratedBytes and LatestSnapshotGeneratedAgeMs? Unfortunately, yes that was the impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
gharris1727 commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1414514216 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -394,9 +399,16 @@ public void validateConnectorConfig(Map connectorProps, Callback @Override public void validateConnectorConfig(Map connectorProps, Callback callback, boolean doLog) { +callback.recordStage(new Stage( +"waiting for a new thread to become available for connector validation", +time.milliseconds() +)); connectorExecutor.submit(() -> { +callback.recordStage(null); Review Comment: Could this instead be a call to Stage::complete, or a use for TemporaryStage? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -771,6 +778,104 @@ private Map defaultSinkConnectorProps(String topics) { return props; } +@Test +public void testRequestTimeouts() throws Exception { +final String configTopic = "test-request-timeout-configs"; +workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); +// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to +// be spuriously triggered after the group coordinator for a Connect cluster is bounced +// Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause +// connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay +// is set to 0 +workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); +connect.start(); +connect.assertions().assertAtLeastNumWorkersAreUp(1, +"Worker did not start in time"); + +Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); +Map connectorConfig2 = new HashMap<>(connectorConfig1); +connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + +// Create a connector to ensure that the worker has completed startup +log.info("Creating initial connector"); +connect.configureConnector(CONNECTOR_NAME, connectorConfig1); +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" +); + +// Bring down Kafka, which should cause some REST requests to fail +log.info("Stopping Kafka cluster"); +connect.kafka().stopOnlyKafka(); +// Allow for the workers to discover that the coordinator is unavailable, wait is +// heartbeat timeout * 2 + 4sec +Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + +connect.requestTimeout(5_000); +// Try to reconfigure the connector, which should fail with a timeout error +log.info("Trying to reconfigure connector while Kafka cluster is down"); +ConnectRestException e = assertThrows( +ConnectRestException.class, +() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) +); +assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertNotNull(e.getMessage()); +assertTrue( +"Message '" + e.getMessage() + "' does not match expected format", +e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Is this the only error message possible when shutting down kafka, or the most common? I would expect that an error about ensuring membership in the cluster could appear. ## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { +private final String description; +private final long started; +private final
Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]
C0urante commented on code in PR #14293: URL: https://github.com/apache/kafka/pull/14293#discussion_r1414541177 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES; + +public class MirrorHerder extends DistributedHerder { + +private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class); + +private final MirrorMakerConfig config; +private final SourceAndTarget sourceAndTarget; +private boolean wasLeader; + +public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget sourceAndTarget, DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List restNamespace, AutoCloseable... uponShutdown) { +super(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, restNamespace, uponShutdown); +this.config = mirrorConfig; +this.sourceAndTarget = sourceAndTarget; +} + +@Override +protected boolean handleRebalanceCompleted() { +if (!super.handleRebalanceCompleted()) { +return false; +} +if (isLeader()) { +if (!wasLeader) { +log.info("This node {} is now a leader for {}. Configuring connectors...", this, sourceAndTarget); +configureConnectors(); +} +wasLeader = true; +} else { +wasLeader = false; +} +return true; +} + +private void configureConnectors() { +CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector); +} + +private void maybeConfigureConnector(Class connectorClass) { +Map connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass); +connectorConfig(connectorClass.getSimpleName(), (e, existingConfig) -> { Review Comment: Do we need/want this to execute asynchronously? At first glance it seems like it'd be cleaner to read the connector config from the [superclass's configState field](https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L193), which is already `protected` (though for somewhat dubious reasons...). ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the
[PR] KAFKA-15022: tests for HA assignor [kafka]
lihaosky opened a new pull request, #14921: URL: https://github.com/apache/kafka/pull/14921 Tests for HAAssignor and StickyAssignor. This PR is on top of https://github.com/apache/kafka/pull/14714 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
dajac commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414553902 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -126,57 +129,63 @@ class CoordinatorPartitionWriter[T]( val magic = logConfig.recordVersion.value val maxBatchSize = logConfig.maxMessageSize val currentTimeMs = time.milliseconds() - -val recordsBuilder = MemoryRecords.builder( - ByteBuffer.allocate(math.min(16384, maxBatchSize)), - magic, - compressionType, - TimestampType.CREATE_TIME, - 0L, - time.milliseconds(), - producerId, - producerEpoch, - 0, - producerId != RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PARTITION_LEADER_EPOCH -) - -records.forEach { record => - val keyBytes = serializer.serializeKey(record) - val valueBytes = serializer.serializeValue(record) - - if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) recordsBuilder.append( -currentTimeMs, -keyBytes, -valueBytes, -EMPTY_HEADERS - ) else throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + -s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") +val bufferSupplier = threadLocalBufferSupplier.get() +val buffer = bufferSupplier.get(math.min(16384, maxBatchSize)) Review Comment: That’s right. I actually took it from here: https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L177 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -126,57 +129,63 @@ class CoordinatorPartitionWriter[T]( val magic = logConfig.recordVersion.value val maxBatchSize = logConfig.maxMessageSize val currentTimeMs = time.milliseconds() - -val recordsBuilder = MemoryRecords.builder( - ByteBuffer.allocate(math.min(16384, maxBatchSize)), - magic, - compressionType, - TimestampType.CREATE_TIME, - 0L, - time.milliseconds(), - producerId, - producerEpoch, - 0, - producerId != RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PARTITION_LEADER_EPOCH -) - -records.forEach { record => - val keyBytes = serializer.serializeKey(record) - val valueBytes = serializer.serializeValue(record) - - if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) recordsBuilder.append( -currentTimeMs, -keyBytes, -valueBytes, -EMPTY_HEADERS - ) else throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + -s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") +val bufferSupplier = threadLocalBufferSupplier.get() +val buffer = bufferSupplier.get(math.min(16384, maxBatchSize)) Review Comment: That’s right. I actually took it from here: https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-00000 Handle consumer close [kafka]
philipnee opened a new pull request, #14920: URL: https://github.com/apache/kafka/pull/14920 When closing the consumer we need to perform a few tasks 1. If auto-commit is enabled, send an autocommit and block until completed 2. Invoke all offsetCommitCallbacks and ensure all inflight commits are sent 3. Invoke partitionsRevoke callbacks 4. Unsubscribe from all partitions 5. LeaveGroup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working
[ https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13988: -- Fix Version/s: 3.6.2 > Mirrormaker 2 auto.offset.reset=latest not working > -- > > Key: KAFKA-13988 > URL: https://issues.apache.org/jira/browse/KAFKA-13988 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.2.0 > Environment: Source Kafka cluster running on Ubuntu 20 > Source Kafka cluster Kafka v0.10 > Target Kafka cluster running in AWS MSK > Target Kafka cluster Kafka v2.6.2 > Mirrormaker version 3.2.0 running on Ubuntu 20. >Reporter: Daniel Florek >Assignee: Chris Egerton >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > Hi. > I have problem setting up mirroring with MM2 from latest offset between 2 > clusters. In logs I can see that Consumer that is consuming topics has > auto.offset.reset property set to latest. But still topics are read from > offset 0. I am using following configuration: > > {code:java} > clusters = A, B > A.bootstrap.servers = broker-01A:9092 > B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092 > replication.policy.class = > org.apache.kafka.connect.mirror.IdentityReplicationPolicy > #Enable replication between clusters and define topics which should be > replicated > A->B.enabled = true > A->B.topics = .* > A->B.replication.factor=3 > A->B.emit.heartbeats.enabled = true > A->B.emit.checkpoints.enabled = true > auto.offset.reset=latest > consumer.auto.offset.reset=latest > A.consumer.auto.offset.reset=latest > B.consumer.auto.offset.reset=latest > refresh.topics.enabled=true > heartbeats.topic.replication.factor=1 > checkpoints.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > config.storage.replication.factor = 1 > offset.storage.replication.factor = 1 > status.storage.replication.factor = 1 {code} > I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker > running on EC2 instance in AWS (quite an old version I think 0.10). Target > kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). > Could you point me what I am doing wrong? Or is this possibly a bug? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
dajac commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414550451 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: That’s right. ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: That’s right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15968 Handle two specific ApiExceptions in EventHandlerExceptionInfo [kafka]
mumrah opened a new pull request, #14919: URL: https://github.com/apache/kafka/pull/14919 This patch adds two cases to EventHandlerExceptionInfo for exceptions thrown by the log layer. Prior to this change, the log could throw a CorruptRecordException and the QuorumController would not treat it as a fatal exception. E.g., ``` INFO [ControllerServer id=9990] handleCommit[baseOffset=192554233]: event failed with CorruptRecordException in 234 microseconds. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 [kafka]
C0urante merged PR #14567: URL: https://github.com/apache/kafka/pull/14567 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 [kafka]
C0urante commented on PR #14567: URL: https://github.com/apache/kafka/pull/14567#issuecomment-1839519217 Thanks Greg! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]
mumrah commented on PR #14918: URL: https://github.com/apache/kafka/pull/14918#issuecomment-1839514024 Thanks @cmccabe. Is the impact of this bug that we weren't reporting the two metrics LatestSnapshotGeneratedBytes and LatestSnapshotGeneratedAgeMs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15280: Implement client support for KIP-848 server-side assignors [kafka]
cadonna commented on code in PR #14878: URL: https://github.com/apache/kafka/pull/14878#discussion_r1414503110 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java: ## @@ -157,6 +157,7 @@ protected RequestManagers create() { CommitRequestManager commit = null; if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { +Optional serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); Review Comment: If the protocol is not `consumer` but a group remote assignor is set, do we not need to ignore that config as we ignore the auto commit interval when a group ID is set? See https://github.com/apache/kafka/blob/ee024966d5283ff95935310e4df039069ae7e695/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L475 If yes, we also need to test that like here: https://github.com/apache/kafka/blob/ee024966d5283ff95935310e4df039069ae7e695/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java#L920 and here: https://github.com/apache/kafka/blob/ee024966d5283ff95935310e4df039069ae7e695/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java#L904 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
philipnee commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414525826 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection topics, Optional + * Process background events, if any + * Briefly wait for {@link CompletableApplicationEvent an event} to complete + * + * + * + * + * Each iteration gives the application thread an opportunity to process background events, which may be + * necessary to complete the overall processing. + * + * + * + * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an + * {@link UnsubscribeApplicationEvent} on the application event queue. That event will eventually trigger the + * rebalancing logic in the background thread. Critically, as part of this rebalancing work, the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However, + * this callback must be executed on the application thread. To achieve this, the background thread enqueues a + * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is + * periodically queried by the application thread to see if there's work to be done. When the application thread + * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the + * background event queue. Moments later, the background thread will see that event, process it, and continue + * execution of the rebalancing logic. The rebalancing logic cannot complete until the + * {@link ConsumerRebalanceListener} callback is performed. + * + * @param event Event that contains a {@link CompletableFuture}; it is on this future that the application thread + * will wait for completion + * @param timer Overall timer that bounds how long the application thread will wait for the event to complete + * @return {@code true} if the event completed within the timeout, {@code false} otherwise + */ +private boolean processBackgroundEvents(CompletableApplicationEvent event, Timer timer) { +log.trace("Enqueuing event {} for processing; will wait up to {} ms to complete", event, timer.remainingMs()); + +do { +backgroundEventProcessor.process(); + +try { +Timer pollInterval = time.timer(100L); +log.trace("Waiting {} ms for event {} to complete", event, pollInterval.remainingMs()); +ConsumerUtils.getResult(event.future(), pollInterval); +log.trace("Event {} completed successfully", event); +return true; +} catch (TimeoutException e) { +// Ignore this as we will retry the event until the timeout expires. +} finally { +timer.update(time.milliseconds()); Review Comment: timer.update() is sufficient -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jolshan commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414519712 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: Just for my understanding though -- in the past we relied on RequestLocal for this sort of thing. Is the idea to move to ThreadLocal here so it is safer and we don't run into things like https://issues.apache.org/jira/browse/KAFKA-15653 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on PR #14843: URL: https://github.com/apache/kafka/pull/14843#issuecomment-1839503610 @mjsax I have resolved the conflicts and build is triggered. Please let me know if PR something else need to be handled or PR can be merged, once build succeeds. cc: @philipnee @AndrewJSchofield -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1414512071 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -292,6 +297,90 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertNull(heartbeatRequest.data().subscribedTopicRegex()); } +@Test +public void testConsumerGroupMetadataFirstUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); + +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse response = createHeartbeatResponse(request, Errors.NONE); +result.unsentRequests.get(0).handler().onComplete(response); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent event = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type()); +final GroupMetadataUpdateEvent groupMetadataUpdateEvent = (GroupMetadataUpdateEvent) event; +final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +memberEpoch, +memberId +); +assertEquals(expectedGroupMetadataUpdateEvent, groupMetadataUpdateEvent); +} + +@Test +public void testConsumerGroupMetadataUpdateWithSameUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse firstResponse = createHeartbeatResponse(request, Errors.NONE); +request.handler().onComplete(firstResponse); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent firstEvent = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, firstEvent.type()); + +time.sleep(2000); Review Comment: @kirktrue Although I call `resetWithZeroHeartbeatInterval()` in the beginning of this test method, I need to sleep at least 1000 ms to get a second heartbeat response. I thought `resetWithZeroHeartbeatInterval()` sets the heartbeat interval to zero, but during debugging I learnt that the heartbeat interval is still 1000 ms. Is this intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]
cmccabe opened a new pull request, #14918: URL: https://github.com/apache/kafka/pull/14918 Fix a bug where we weren't properly exposing SnapshotEmitterMetrics. Add a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jeffkbkim commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414507880 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: makes sense. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15645) Move ReplicationQuotasTestRig to tools
[ https://issues.apache.org/jira/browse/KAFKA-15645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-15645: --- Parent: KAFKA-14525 Issue Type: Sub-task (was: Task) > Move ReplicationQuotasTestRig to tools > -- > > Key: KAFKA-15645 > URL: https://issues.apache.org/jira/browse/KAFKA-15645 > Project: Kafka > Issue Type: Sub-task >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Minor > > ReplicationQuotasTestRig class used for measuring performance. > Conains dependencies to `ReassignPartitionCommand` API. > To move all commands to tools must move ReplicationQuotasTestRig to tools, > also. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jolshan commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414500105 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: For more context on 4. There are assumptions in the code that a request is handled on the same thread so ByteBuffers are not thread safe. There was a bug in KIP-890 (see [here](https://issues.apache.org/jira/browse/KAFKA-15653)) where we used the wrong byte buffer and it caused NPEs and other exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jolshan commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414508061 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -126,57 +129,63 @@ class CoordinatorPartitionWriter[T]( val magic = logConfig.recordVersion.value val maxBatchSize = logConfig.maxMessageSize val currentTimeMs = time.milliseconds() - -val recordsBuilder = MemoryRecords.builder( - ByteBuffer.allocate(math.min(16384, maxBatchSize)), - magic, - compressionType, - TimestampType.CREATE_TIME, - 0L, - time.milliseconds(), - producerId, - producerEpoch, - 0, - producerId != RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PARTITION_LEADER_EPOCH -) - -records.forEach { record => - val keyBytes = serializer.serializeKey(record) - val valueBytes = serializer.serializeValue(record) - - if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) recordsBuilder.append( -currentTimeMs, -keyBytes, -valueBytes, -EMPTY_HEADERS - ) else throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + -s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") +val bufferSupplier = threadLocalBufferSupplier.get() +val buffer = bufferSupplier.get(math.min(16384, maxBatchSize)) Review Comment: Is this 16384 just the default batch size? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various fixes in the docs [kafka]
mimaison merged PR #14914: URL: https://github.com/apache/kafka/pull/14914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jolshan commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414500105 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: For more context on 4. There are assumptions in the code that a request is handled on the same thread so ByteBuffers are not thread safe. There was a bug in KIP-890 (see [here](https://issues.apache.org/jira/browse/KAFKA-15653)) where we used the wrong byte buffer and it caused NPEs and other exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]
jolshan commented on code in PR #14885: URL: https://github.com/apache/kafka/pull/14885#discussion_r1414500105 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + private val threadLocalBufferSupplier = ThreadLocal.withInitial( +() => new BufferSupplier.GrowableBufferSupplier() + ) Review Comment: For more context on 4. There are assumptions in the code that a request is handled on the same thread. There was a bug in KIP-890 (see [here](https://issues.apache.org/jira/browse/KAFKA-15653)) where we used the wrong byte buffer and it caused NPEs and other exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various fixes in the docs [kafka]
mimaison commented on code in PR #14914: URL: https://github.com/apache/kafka/pull/14914#discussion_r1414498206 ## docs/ops.html: ## @@ -3570,7 +3572,7 @@ 6.9 ZooKeeper Stable version - The current stable branch is 3.5. Kafka is regularly updated to include the latest release in the 3.5 series. + The current stable branch is 3.8. Kafka is regularly updated to include the latest release in the 3.8 series. Review Comment: This is the ZooKeeper version. We don't have a variable to track that. At this point, it's likely it won't change anymore before we remove ZooKeeper, hence why I decided to just update the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org