[GitHub] [kafka] dengziming commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
dengziming commented on code in PR #13679: URL: https://github.com/apache/kafka/pull/13679#discussion_r1220994605 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -112,4 +157,8 @@ class DefaultApiVersionManager( zkMigrationEnabled ) } + + override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = { +throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead") Review Comment: The `features` in `ApiVersionResponse` is retrieved directly from `MetadataCache` in BrokerServer/KafkaServer, but in ControllerServer it can only be got asynchronized and can't be unified, so I added 2 different methods here. I'm still checking whether there are better way to handle this, one way is to add a synchronized method to controller, another is to make `ApiVersionManager.apiVersionResponse` asynchronized, both will introduce new problems, the root cause is that `ApiVersionRequest` is treated static but finalized features is changing dynamically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
urbandan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1580030486 @viktorsomogyi Since this part of the code is quite tricky, I would try to address the different issues in different PRs. I believe that the fix I'm proposing will solve the issue reported in KAFKA-14034 specifically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0
showuon commented on PR #13662: URL: https://github.com/apache/kafka/pull/13662#issuecomment-1580008259 @bmscomp , could you respond to @divijvaidya 's comments above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15064) Use KafkaTemplate to send message with below exception - IllegalMonitorStateException
Xuguang zhan created KAFKA-15064: Summary: Use KafkaTemplate to send message with below exception - IllegalMonitorStateException Key: KAFKA-15064 URL: https://issues.apache.org/jira/browse/KAFKA-15064 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.1.2 Reporter: Xuguang zhan *Running env:* 1.openjdk-17.0.2-5.el7.x86_64 2.Spring-kafka :2.8.11 3. Kafka client: 3.1.2 Special case would be: one Tomcat have three web applications or we say context , Kafka client put into tomcat share lib. *Java Stack:* java.lang.IllegalMonitorStateException: current thread is not owner at java.lang.Object.wait(Native Method) ~[?:?] at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55) ~[kafka-clients-3.1.2.jar:?] at org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119) ~[kafka-clients-3.1.2.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1088) ~[kafka-clients-3.1.2.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:935) ~[kafka-clients-3.1.2.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.2.jar:?] at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429) ~[spring-kafka-2.8.11.jar:2.8.11] Below is low level code stack : {code:java} // @Override public void waitObject(Object obj, Supplier condition, long deadlineMs) throws InterruptedException { synchronized (obj) { while (true) { if (condition.get()) return; long currentTimeMs = milliseconds(); if (currentTimeMs >= deadlineMs) throw new TimeoutException("Condition not satisfied before deadline"); obj.wait(deadlineMs - currentTimeMs); } } } {code} {code:java} // code placeholder /** * Wait for metadata update until the current version is larger than the last version we know of */ public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException { long currentTimeMs = time.milliseconds(); long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs; time.waitObject(this, () -> { // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller. maybeThrowFatalException(); return updateVersion() > lastVersion || isClosed(); }, deadlineMs); if (isClosed()) throw new KafkaException("Requested metadata update after close"); } {code} I checked same issue check the jira which have been reported https://issues.apache.org/jira/browse/KAFKA-10902 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Vaibhav-Nazare commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage
Vaibhav-Nazare commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1579948195 @dajac Can you check the please check the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
sambhav-jain-16 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1579930237 Hi @C0urante, Did you get a chance to take a look at it? TIA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15062) Power(ppc64le) support for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-15062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vaibhav updated KAFKA-15062: Reviewer: David Jacot > Power(ppc64le) support for Kafka > > > Key: KAFKA-15062 > URL: https://issues.apache.org/jira/browse/KAFKA-15062 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Vaibhav >Priority: Major > > Support for Power architecture (ppc64le) for apache kafka. > What is IBM Power architecture? > It is a RISC architecture and IBM has recently made its ISA (Instruction Set > Architecture) opensource and in doing so, they have significantly contributed > back to the opensource community at large. Many of the pioneers of banking > and HPC industries today run on ppc64le architecture. > As an ongoing effort to enable open-source projects where Power architecture > can add value, we are trying to enable kafka on Power. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core
[ https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729848#comment-17729848 ] Scott Rowley edited comment on KAFKA-15000 at 6/7/23 3:41 AM: -- [~showuon] Thank you for your time on this. The vulnerability description is: _com.fasterxml.jackson.core_jackson-core package versions before 2.15.0 are vulnerable to Denial of Service (DoS). The package does not properly restrict the size or amount of resources that are requested or influenced by an actor, which can be used to consume more resources than intended and leads to Uncontrolled Resource Consumption (\'Resource Exhaustion\')._ Severity: High, CVSS 7.5 For some background for others, my understanding is the "PRISMA" identifier comes from a proprietary vulnerability database from Twistlock, now owned by Palo Alto's PRISMA scanner. In my observations, they tend to flag items where a "security related" merge has been made in a github project as a mechanism for their customers to trigger version upgrades. This makes it hard for downstream projects such as Kafka to keep up, as there often isn't a public reference to assess risk or otherwise action. As an example, -there's no linked Jackson github request- I see, so it is not clear whether this may have also been addressed on the latest minor version of jackson 2.14.3 which is after 2.15.0 was released. Edit: Linked PR is [https://github.com/FasterXML/jackson-core/pull/827] I've been lurking for a while, but i'm not sure i've come across any dependency upgrade strategy or policy for Kafka (e.g. when to do minor version updates, when to do major). From looking at the Jackson github and wiki, which some of the lifecycle information seems out of date, the 2.15 and 2.14 versions are actively in release mode. 2.13 may still be open for selective fixes but appears to be next on the list to end of life. So independent of any vulnerability, getting Kafka off 2.13 is likely a good medium-term activity. The PR [https://github.com/apache/kafka/pull/13662] seems to be making progress on this, though with some technical hurdles still to overcome. Edit: Added link to reported vulnerable PR by PRISMA. [https://github.com/FasterXML/jackson-core/pull/827/files] While it seems some may have gotten into 2.14 ([https://github.com/FasterXML/jackson-core/pull/1013)] it seems like not everything did. was (Author: JIRAUSER300756): [~showuon] Thank you for your time on this. The vulnerability description is: _com.fasterxml.jackson.core_jackson-core package versions before 2.15.0 are vulnerable to Denial of Service (DoS). The package does not properly restrict the size or amount of resources that are requested or influenced by an actor, which can be used to consume more resources than intended and leads to Uncontrolled Resource Consumption (\'Resource Exhaustion\')._ Severity: High, CVSS 7.5 For some background for others, my understanding is the "PRISMA" identifier comes from a proprietary vulnerability database from Twistlock, now owned by Palo Alto's PRISMA scanner. In my observations, they tend to flag items where a "security related" merge has been made in a github project as a mechanism for their customers to trigger version upgrades. This makes it hard for downstream projects such as Kafka to keep up, as there often isn't a public reference to assess risk or otherwise action. As an example, there's no linked Jackson github request I see, so it is not clear whether this may have also been addressed on the latest minor version of jackson 2.14.3 which is after 2.15.0 was released. I've been lurking for a while, but i'm not sure i've come across any dependency upgrade strategy or policy for Kafka (e.g. when to do minor version updates, when to do major). From looking at the Jackson github and wiki, which some of the lifecycle information seems out of date, the 2.15 and 2.14 versions are actively in release mode. 2.13 may still be open for selective fixes but appears to be next on the list to end of life. So independent of any vulnerability, getting Kafka off 2.13 is likely a good medium-term activity. The PR [https://github.com/apache/kafka/pull/13662] seems to be making progress on this, though with some technical hurdles still to overcome. > High vulnerability PRISMA-2023-0067 reported in jackson-core > > > Key: KAFKA-15000 > URL: https://issues.apache.org/jira/browse/KAFKA-15000 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.3.2 >Reporter: Arushi Rai >Priority: Critical > > Kafka is using jackson-core version 2.13.4 which has high vulnerability > reported [PRISMA-2023-0067. > |https://github.com/FasterXML/jackson-core/pull/827] > This
[GitHub] [kafka] ijuma commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
ijuma commented on code in PR #13679: URL: https://github.com/apache/kafka/pull/13679#discussion_r1220723779 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -112,4 +157,8 @@ class DefaultApiVersionManager( zkMigrationEnabled ) } + + override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = { +throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead") Review Comment: What's the thinking around this? It's extremely brittle to design interfaces like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #13822: MINOR: update system test for 3.4.1
showuon closed pull request #13822: MINOR: update system test for 3.4.1 URL: https://github.com/apache/kafka/pull/13822 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13822: MINOR: update system test for 3.4.1
showuon opened a new pull request, #13822: URL: https://github.com/apache/kafka/pull/13822 update system test for 3.4.1 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode
showuon commented on PR #13807: URL: https://github.com/apache/kafka/pull/13807#issuecomment-1579729897 Retriggered: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13807/4/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode
satishd commented on PR #13807: URL: https://github.com/apache/kafka/pull/13807#issuecomment-1579719397 @showuon can you retrigger the build? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
hgeraldino commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1220625126 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -706,95 +662,72 @@ public void testSourceTaskIgnoresProducerException() throws Exception { // and no ConnectException will be thrown SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectOffsetFlush(true); -expectSendRecordOnce(); -expectSendRecordProducerCallbackFail(); -sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); -//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) -//Only the last offset will be passed to the method as everything up to that point is committed -//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked -offsetWriter.offset(PARTITION, offset2); -PowerMock.expectLastCall(); +expectOffsetFlush(); +expectPreliminaryCalls(); -PowerMock.replayAll(); +when(producer.send(any(ProducerRecord.class), any(Callback.class))) +.thenAnswer(producerSendAnswer(true)) +.thenAnswer(producerSendAnswer(false)); //Send records and then commit offsets and verify both were committed and no exception -Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); -Whitebox.invokeMethod(workerTask, "sendRecords"); -Whitebox.invokeMethod(workerTask, "updateCommittableOffsets"); +workerTask.toSend = Arrays.asList(record1, record2); +workerTask.sendRecords(); +workerTask.updateCommittableOffsets(); workerTask.commitOffsets(); -PowerMock.verifyAll(); +//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) +//Only the last offset will be passed to the method as everything up to that point is committed +//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked +verify(offsetWriter).offset(PARTITION, offset2); +verify(sourceTask).commitRecord(any(SourceRecord.class), isNull()); //Double check to make sure all submitted records were cleared -assertEquals(0, ((SubmittedRecords) Whitebox.getInternalState(workerTask, -"submittedRecords")).records.size()); +assertEquals(0, workerTask.submittedRecords.records.size()); } @Test public void testSlowTaskStart() throws Exception { final CountDownLatch startupLatch = new CountDownLatch(1); final CountDownLatch finishStartupLatch = new CountDownLatch(1); - createWorkerTask(); -offsetStore.start(); -EasyMock.expectLastCall(); -sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); -EasyMock.expectLastCall(); -sourceTask.start(TASK_PROPS); -EasyMock.expectLastCall().andAnswer(() -> { +doAnswer((Answer) invocation -> { startupLatch.countDown(); -assertTrue(awaitLatch(finishStartupLatch)); +ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting for task to stop"); Review Comment: Hmm... not really. I mean, the countDownLatch is updated after calling `workerTask.stop()` on L710, but this call only sets an internal flag on the task and doesn't really block, so technically no. I fail to understand the value of this synchronization, so decided to remove the `finishStartupLatch` altogether -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1220560961 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1040 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalArgume
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1220554821 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalA
[GitHub] [kafka] jolshan commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients
jolshan commented on code in PR #13811: URL: https://github.com/apache/kafka/pull/13811#discussion_r1220507827 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1376,7 +1376,7 @@ public void handleResponse(AbstractResponse response) { fatalError(error.exception()); return; } else if (error == Errors.INVALID_TXN_STATE) { -fatalError(new KafkaException(error.exception())); Review Comment: As mentioned, in the other instance, this probably won't be returned to the client, but in the case that it was -- could there be a client that expects the wrapped error? At this point, it's probably not an issue, but just curious the implication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients
jolshan commented on code in PR #13811: URL: https://github.com/apache/kafka/pull/13811#discussion_r1220504060 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1599,6 +1599,8 @@ public void handleResponse(AbstractResponse response) { fatalError(Errors.PRODUCER_FENCED.exception()); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); +} else if (error == Errors.INVALID_TXN_STATE) { Review Comment: I went down a very long rabbit hole to find that we only returned INVALID_TXN_STATE for a short time from this request. https://github.com/apache/kafka/commit/1f2451d4e7e3766540d3650d177e304fcddf49b8 (here's the commit that removed this error). We never removed it from the errors returned I suppose because some really old broker could return it. Having this here doesn't hurt, I was just curious about the history behind it. 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients
jolshan commented on code in PR #13811: URL: https://github.com/apache/kafka/pull/13811#discussion_r1220504060 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1599,6 +1599,8 @@ public void handleResponse(AbstractResponse response) { fatalError(Errors.PRODUCER_FENCED.exception()); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); +} else if (error == Errors.INVALID_TXN_STATE) { Review Comment: I went down a very long rabbit hole to find that we only returned INVALID_TXN_STATE for a short time from this request: https://github.com/apache/kafka/commit/1f2451d4e7e3766540d3650d177e304fcddf49b8 (here's the commit that removed this error). We never removed it from the errors returned I suppose because some really old broker could return it. Having this here doesn't hurt, I was just curious about the history behind it. 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core
[ https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729848#comment-17729848 ] Scott Rowley commented on KAFKA-15000: -- [~showuon] Thank you for your time on this. The vulnerability description is: _com.fasterxml.jackson.core_jackson-core package versions before 2.15.0 are vulnerable to Denial of Service (DoS). The package does not properly restrict the size or amount of resources that are requested or influenced by an actor, which can be used to consume more resources than intended and leads to Uncontrolled Resource Consumption (\'Resource Exhaustion\')._ Severity: High, CVSS 7.5 For some background for others, my understanding is the "PRISMA" identifier comes from a proprietary vulnerability database from Twistlock, now owned by Palo Alto's PRISMA scanner. In my observations, they tend to flag items where a "security related" merge has been made in a github project as a mechanism for their customers to trigger version upgrades. This makes it hard for downstream projects such as Kafka to keep up, as there often isn't a public reference to assess risk or otherwise action. As an example, there's no linked Jackson github request I see, so it is not clear whether this may have also been addressed on the latest minor version of jackson 2.14.3 which is after 2.15.0 was released. I've been lurking for a while, but i'm not sure i've come across any dependency upgrade strategy or policy for Kafka (e.g. when to do minor version updates, when to do major). From looking at the Jackson github and wiki, which some of the lifecycle information seems out of date, the 2.15 and 2.14 versions are actively in release mode. 2.13 may still be open for selective fixes but appears to be next on the list to end of life. So independent of any vulnerability, getting Kafka off 2.13 is likely a good medium-term activity. The PR [https://github.com/apache/kafka/pull/13662] seems to be making progress on this, though with some technical hurdles still to overcome. > High vulnerability PRISMA-2023-0067 reported in jackson-core > > > Key: KAFKA-15000 > URL: https://issues.apache.org/jira/browse/KAFKA-15000 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.3.2 >Reporter: Arushi Rai >Priority: Critical > > Kafka is using jackson-core version 2.13.4 which has high vulnerability > reported [PRISMA-2023-0067. > |https://github.com/FasterXML/jackson-core/pull/827] > This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to > the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
C0urante commented on PR #13819: URL: https://github.com/apache/kafka/pull/13819#issuecomment-1579470948 @mimaison @viktorsomogyi if either of you has a moment, would you mind taking a look? This change is deceptively small but I've tried to highlight in the description both why it's valuable for testing and non-testing environments, and why it's safe. Happy to discuss further if it helps! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
C0urante commented on PR #13819: URL: https://github.com/apache/kafka/pull/13819#issuecomment-1579468092 The next run also had [no failing integration tests](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13819/2/tests) for MirrorMaker 2. Marking ready for review, but kicking off another build just to play it safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1220268552 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalA
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1220268287 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalA
[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
jolshan commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1220267344 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalA
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13816: MINOR: Optimize runtime of MM2 integration tests by batching transactions
gharris1727 commented on code in PR #13816: URL: https://github.com/apache/kafka/pull/13816#discussion_r1220234170 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java: ## @@ -34,14 +38,39 @@ static Map makeProps(String... keyValues) { } return props; } - -/* - * return records with different but predictable key and value + +/** + * Assemble a collection of records arbitrarily distributed across all partitions of the specified topic + * @param topicName Destination topic + * @param numRecords count of records to produce to the topic in total + * @return A batch of records that can be sent to a producer. */ -public static Map generateRecords(int numRecords) { -Map records = new HashMap<>(); +public static List> generateRecords(String topicName, int numRecords) { +List> records = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { -records.put("key-" + i, "message-" + i); +String key = "key-" + i; +String value = "message-" + i; +records.add(new ProducerRecord<>(topicName, null, key.getBytes(), value.getBytes())); +} +return records; +} + +/** + * Assemble a collection of records evenly distributed across some partitions of the specified topic Review Comment: Since the tests don't rely on this keying, and having the same key in multiple partitions is strange, i'll change this to make the keys unique. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1220212149 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; +private String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod; +minTimestamp = Long.MAX_VALUE; +numRec = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue = null; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod.toMillis()); +if (iterator.hasNext()) { +keyValue = iterator.next(); +} +if (keyValue == null) { +if (numRecords() == 0) { +minTimestamp = Long.MAX_VALUE; +} +return; +} +BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); + +Change value = valueSerde.deserializeParts( +topic, +new Change<>(bufferValue.newValue(), bufferValue.oldValue()) +); +while (keyValue != null && predicate.get() && wrapped().observedStreamTime - gracePeriod.toMillis() >= minTimestamp()) { +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +}
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1220161797 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private FullChangeSerde valueSerde; +private final String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod); +try { Review Comment: I always forget that exists. Good call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on PR #13756: URL: https://github.com/apache/kafka/pull/13756#issuecomment-1579290388 @vcrfxia Once you are good with this can you take a look here[https://github.com/wcarlson5/kafka/pull/1]? Its the second part and has the new joining logic in it. After that I should add the recovery logic a third PR I have it targeted to this feature branch until this gets merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1220108194 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; +private String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod; +minTimestamp = Long.MAX_VALUE; +numRec = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue = null; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod.toMillis()); +if (iterator.hasNext()) { +keyValue = iterator.next(); +} +if (keyValue == null) { +if (numRecords() == 0) { +minTimestamp = Long.MAX_VALUE; +} +return; +} +BufferValue bufferValue = BufferValue.deserialize(ByteBuffer.wrap(keyValue.value)); +K key = keySerde.deserializer().deserialize(topic, + PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get())); + +Change value = valueSerde.deserializeParts( +topic, +new Change<>(bufferValue.newValue(), bufferValue.oldValue()) +); +while (keyValue != null && predicate.get() && wrapped().observedStreamTime - gracePeriod.toMillis() >= minTimestamp()) { +if (bufferValue.context().timestamp() != minTimestamp) { +throw new IllegalStateException( +"minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" + +bufferValue.context().timestamp() + "]" +); +} +
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1220078561 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition, } } - def hasOngoingTransaction(producerId: Long): Boolean = { -leaderLogIfLocal.exists(leaderLog => leaderLog.hasOngoingTransaction(producerId)) + // Returns a verificationGuard object if we need to verify. This starts or continues the verification process. Otherwise return null. Review Comment: I can swap all these to not use this term. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13821: MINOR: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
gharris1727 opened a new pull request, #13821: URL: https://github.com/apache/kafka/pull/13821 In order to support multiple scanning modes, we should refactor the existing scanning mechanism out of the DelegatingClassLoader. This is because KIP-898 will require more functionality that relies on the results of scanning, and it is not appropriate to add to the DCL itself. Scanning (and the PluginScanResult) are dependent on the ClassLoader instances which are used to load the plugins, so the DelegatingClassLoader is still responsible for processing the plugin path and instantiating the PluginClassLoader, and emits these to the external scanner via PluginSource objects. In addition to pulling the existing reflection-based scanning out into a ReflectionScanner and superclass PluginScanner, add the ServiceLoaderScanner (currently unused) which shares some functionality with the ReflectionScanner. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12290: MINOR: Stop leaking threads in BlockingConnectorTest
C0urante commented on PR #12290: URL: https://github.com/apache/kafka/pull/12290#issuecomment-1579174340 @tombentley @viktorsomogyi if you have a moment, would you mind taking a look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12444: KAFKA-14101: Improve documentation for consuming from embedded Kafka cluster topics in Connect integration testing framework
C0urante commented on PR #12444: URL: https://github.com/apache/kafka/pull/12444#issuecomment-1579167201 Downgrading to a draft until I can revisit and fix the merge conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite
C0urante commented on PR #12307: URL: https://github.com/apache/kafka/pull/12307#issuecomment-1579164297 @mimaison @showuon if you have a moment, would you mind taking a look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11608: KAFKA-13533: Clean up resources on failed connector and task startup
C0urante commented on PR #11608: URL: https://github.com/apache/kafka/pull/11608#issuecomment-1579162986 Downgrading to a draft until I can revisit and fix the merge conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11369: KAFKA-13327, KAFKA-13328, KAFKA-13329: Clean up preflight connector validation
C0urante commented on PR #11369: URL: https://github.com/apache/kafka/pull/11369#issuecomment-1579162605 Downgrading to a draft until I can revisit and fix the merge conflicts. I may also split this into several PRs to make review easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante closed pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect
C0urante closed pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect URL: https://github.com/apache/kafka/pull/11986 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect
C0urante commented on PR #11986: URL: https://github.com/apache/kafka/pull/11986#issuecomment-1579157272 Closing due to lack of review. We can revisit in the future, although the downgrade of these log messages from `WARN` to `INFO` level in https://github.com/apache/kafka/pull/13225 likely addresses the underlying concern here that the `WARN` level is highly polluted with unused config property messages in Kafka Connect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante closed pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing
C0urante closed pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing URL: https://github.com/apache/kafka/pull/12666 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing
C0urante commented on PR #12666: URL: https://github.com/apache/kafka/pull/12666#issuecomment-1579153067 Closing due to lack of interest; we can revisit this if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13816: MINOR: Optimize runtime of MM2 integration tests by batching transactions
C0urante commented on code in PR #13816: URL: https://github.com/apache/kafka/pull/13816#discussion_r1219842657 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java: ## @@ -34,14 +38,39 @@ static Map makeProps(String... keyValues) { } return props; } - -/* - * return records with different but predictable key and value + +/** + * Assemble a collection of records arbitrarily distributed across all partitions of the specified topic + * @param topicName Destination topic + * @param numRecords count of records to produce to the topic in total + * @return A batch of records that can be sent to a producer. */ -public static Map generateRecords(int numRecords) { -Map records = new HashMap<>(); +public static List> generateRecords(String topicName, int numRecords) { +List> records = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { -records.put("key-" + i, "message-" + i); +String key = "key-" + i; +String value = "message-" + i; +records.add(new ProducerRecord<>(topicName, null, key.getBytes(), value.getBytes())); +} +return records; +} + +/** + * Assemble a collection of records evenly distributed across some partitions of the specified topic Review Comment: We may want to add a note here that, if `numPartitions` is greater than one, this will cause records with the same key to be written to different partitions. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -234,6 +245,8 @@ public void startClusters(Map additionalMM2Config) throws Except @AfterEach public void shutdownClusters() throws Exception { try { +primaryProducer.close(); +backupProducer.close(); Review Comment: We probably still want to try to gracefully shut down the cluster even if we encounter an error with stopping our producers: ```suggestion Utils.closeQuietly(primaryProducer); Utils.closeQuietly(backupProducer); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito
clolov commented on PR #13712: URL: https://github.com/apache/kafka/pull/13712#issuecomment-1579134299 Heya @cadonna! I hope I have addressed your comment, rebased and updated the overview! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
C0urante commented on PR #13819: URL: https://github.com/apache/kafka/pull/13819#issuecomment-1579102183 The first CI run has finished with [no failing integration tests](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13819/1/tests/) for MirrorMaker 2 🎉 Waiting for the [next run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13819/2/pipeline) to conclude before marking ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1219946644 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: I agree for the most part. The issue comes with https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java I was running into problems fixing it and the PR kept ballooning. I might just have been doing something wrong. WHen We come back to fix the serialization error I would rather break it out into a second PR as it was getting messy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR
[ https://issues.apache.org/jira/browse/KAFKA-15021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15021: --- Fix Version/s: 3.6.0 > KRaft controller increases leader epoch when shrinking ISR > -- > > Key: KAFKA-15021 > URL: https://issues.apache.org/jira/browse/KAFKA-15021 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > When the KRaft controller shrinks the ISR it also forces the leader epoch to > increase. This is unnecessary and cases all of the follower replica fetches > to get invalidated. > Here is an example trace of this behavior after replica 8 was shutdown: > {code:java} > kafka-dump-log --cluster-metadata-decoder --files > __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw > ... > | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}} > | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}} > | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}} > | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}} > | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}} > | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}} > | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}} > {code} > Also, notice how the leader epoch was not increased when the ISR was expanded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR
[ https://issues.apache.org/jira/browse/KAFKA-15021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio reopened KAFKA-15021: > KRaft controller increases leader epoch when shrinking ISR > -- > > Key: KAFKA-15021 > URL: https://issues.apache.org/jira/browse/KAFKA-15021 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > > When the KRaft controller shrinks the ISR it also forces the leader epoch to > increase. This is unnecessary and cases all of the follower replica fetches > to get invalidated. > Here is an example trace of this behavior after replica 8 was shutdown: > {code:java} > kafka-dump-log --cluster-metadata-decoder --files > __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw > ... > | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}} > | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}} > | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}} > | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}} > | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}} > | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}} > | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}} > {code} > Also, notice how the leader epoch was not increased when the ISR was expanded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR
[ https://issues.apache.org/jira/browse/KAFKA-15021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-15021. Resolution: Fixed > KRaft controller increases leader epoch when shrinking ISR > -- > > Key: KAFKA-15021 > URL: https://issues.apache.org/jira/browse/KAFKA-15021 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > > When the KRaft controller shrinks the ISR it also forces the leader epoch to > increase. This is unnecessary and cases all of the follower replica fetches > to get invalidated. > Here is an example trace of this behavior after replica 8 was shutdown: > {code:java} > kafka-dump-log --cluster-metadata-decoder --files > __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw > ... > | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}} > | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}} > | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}} > | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}} > | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}} > | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}} > | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 > sequence: -1 headerKeys: [] payload: > {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}} > {code} > Also, notice how the leader epoch was not increased when the ISR was expanded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vvcephei commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
vvcephei commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1578989020 Just checking to see if the github/jenkins trigger still works. It looks like it does not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
vvcephei commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1578988387 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13772: MINOR: Add helper util `Snapshots.lastContainedLogTimestamp`
jsancio merged PR #13772: URL: https://github.com/apache/kafka/pull/13772 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
vvcephei commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578986116 Yeah, sorry for the weird drive-bys. I was indeed trying to see if those commands still work. Apparently, they do not. I think this is the plugin documentation, but it also looks like it was associated with security flaws, so maybe Infra disabled it: https://plugins.jenkins.io/ghprb/ I'll raise a ticket with them. At the least, if it doesn't work anymore, they should remove the docs for that role in asf.yaml. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
C0urante commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578983184 @vvcephei I kicked off another Jenkins build--I haven't seen GitHub comments working for a while to re-trigger them. Are you trying to test that out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
vvcephei commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-157898 test this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
vvcephei commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578980743 re-test this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
vvcephei commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578980602 re-test this, please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint
vvcephei commented on PR #13803: URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578980388 Retest this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
vvcephei commented on PR #13802: URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578974789 retest this, please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
vvcephei commented on PR #13802: URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578973821 retest this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
vvcephei commented on PR #13802: URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578973157 I'm going to try something out for triggering builds... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
mumrah commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1219834583 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -223,16 +223,16 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat ); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); operationConsumer.accept( -UPDATE_TOPIC_CONFIG, -"Updating Configs for Topic " + topicName + ", ID " + topicId, +DELETE_TOPIC_CONFIG, Review Comment: Kind of. The opType is just used in logging, so there was a bug in that regard. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
mumrah commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1219833727 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -267,6 +267,9 @@ void handleTopicsDelta( ) { topicsDelta.deletedTopicIds().forEach(topicId -> { String name = deletedTopicNameResolver.apply(topicId); +if (name == null) { Review Comment: I ran into a NPE here in my test code when I passed an empty map. In the production code, the function passed in here the getter of `topicsById` map in TopicsImage. I think it's impossible for there to be something in `deletedTopicIds` that's not also in `topicsById`. This check isn't really necessary, but the custom RuntimeException is better than an NPE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15063) Throttle number of active PIDs
[ https://issues.apache.org/jira/browse/KAFKA-15063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-15063: -- Description: {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} {color:#172b4d}producers became the default in Kafka as a result of this all producer instances will be assigned PID. The increase of the number of PIDs stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors if it has a high number of producers, a rogue or misconfigured client(s).{color} {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 introduced a separate config to expire PID from transaction IDs if there is a high number of PID before {{producer.id.expiration.ms}} is exceeded. As a result of this, the broker will keep experiencing OOM and become offline. The only way to recover from this is to increase the heap. {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per KafkaPrincipal {color} {color:#172b4d}See the KIP-936 details here [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] {color} was: {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} {color:#172b4d}producers became the default in Kafka as a result of this all producer instances will be assigned PID. The increase of the number of PIDs stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors if it has a high number of producers, a rogue or misconfigured client(s).{color} {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 introduced a separate config to expire PID from transaction IDs if there is a high number of PID before {{producer.id.expiration.ms}} is exceeded. As a result of this, the broker will keep experiencing OOM and become offline. The only way to recover from this is to increase the heap. {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per KafkaPrincipal {color} {color:#172b4d}See the KIP details here [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] {color} > Throttle number of active PIDs > -- > > Key: KAFKA-15063 > URL: https://issues.apache.org/jira/browse/KAFKA-15063 > Project: Kafka > Issue Type: New Feature > Components: core, producer >Affects Versions: 2.8.0, 3.1.0, 3.0.0, 3.2.0, 3.3, 3.4.0 >Reporter: Omnia Ibrahim >Priority: Major > > {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} > {color:#172b4d}producers became the default in Kafka as a result of this all > producer instances will be assigned PID. The increase of the number of PIDs > stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} > exposes the broker to OOM errors if it has a high number of producers, a > rogue or misconfigured client(s).{color} > {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 > introduced a separate config to expire PID from transaction IDs if there is a > high number of PID before {{producer.id.expiration.ms}} is exceeded. > As a result of this, the broker will keep experiencing OOM and become > offline. The only way to recover from this is to increase the heap. > > {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per > KafkaPrincipal {color} > {color:#172b4d}See the KIP-936 details here > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] > {color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15063) Throttle number of active PIDs
[ https://issues.apache.org/jira/browse/KAFKA-15063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-15063: -- Description: {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} {color:#172b4d}producers became the default in Kafka as a result of this all producer instances will be assigned PID. The increase of the number of PIDs stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors if it has a high number of producers, a rogue or misconfigured client(s).{color} {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 introduced a separate config to expire PID from transaction IDs if there is a high number of PID before {{producer.id.expiration.ms}} is exceeded. As a result of this, the broker will keep experiencing OOM and become offline. The only way to recover from this is to increase the heap. {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per KafkaPrincipal {color} {color:#172b4d}See the KIP details here [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] {color} was: {color:#172b4d}Ticket to track to track KIP-936. Since KIP-679 i{color:#172b4d}dempotent{color} {color:#172b4d}producers became the default in Kafka {color:#172b4d}as a result of this all producer instances will be assigned PID. The increase of number of PIDs stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors if it has a high number of producers, rogue or misconfigured client(s).{color} {color}{color} {color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}The broker is still exposed to OOM{color}{color}{color} even after KIP-854 introduced a separated config to expire PID from transaction IDs if there is high number of PID before {color}{{producer.id.expiration.ms}}{color:#172b4d} is exceeded. {color}{color}{color} As a result of this the broker will keep experincing OOM and become offline. The only way to recover from this is to increase the heap. {color:#172b4d}KIP-936 is proposing throttling number of PIDs per KafkaPrincipal {color} {color:#172b4d}See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] {color} > Throttle number of active PIDs > -- > > Key: KAFKA-15063 > URL: https://issues.apache.org/jira/browse/KAFKA-15063 > Project: Kafka > Issue Type: New Feature > Components: core, producer >Affects Versions: 2.8.0, 3.1.0, 3.0.0, 3.2.0, 3.3, 3.4.0 >Reporter: Omnia Ibrahim >Priority: Major > > {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} > {color:#172b4d}producers became the default in Kafka as a result of this all > producer instances will be assigned PID. The increase of the number of PIDs > stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} > exposes the broker to OOM errors if it has a high number of producers, a > rogue or misconfigured client(s).{color} > {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 > introduced a separate config to expire PID from transaction IDs if there is a > high number of PID before {{producer.id.expiration.ms}} is exceeded. > As a result of this, the broker will keep experiencing OOM and become > offline. The only way to recover from this is to increase the heap. > > {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per > KafkaPrincipal {color} > {color:#172b4d}See the KIP details here > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] > {color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14791) Create a builder class for PartitionRegistration
[ https://issues.apache.org/jira/browse/KAFKA-14791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant resolved KAFKA-14791. -- Resolution: Fixed > Create a builder class for PartitionRegistration > > > Key: KAFKA-14791 > URL: https://issues.apache.org/jira/browse/KAFKA-14791 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
dajac commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1219799829 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,9 +579,30 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verificationGuard object for the given producer ID if the transaction is not yet ongoing. Review Comment: ditto. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. + // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on + // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append. + // There are two phases -- the first append to the log and subsequent appends. + // + // 1. First append: Verification starts with creating a verificationGuard, sending a verification request to the transaction coordinator, and Review Comment: nit `verification guard` for consistency. The are a few other cases in this comment. ## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ## @@ -1087,6 +1087,46 @@ class ProducerStateManagerTest { assertTrue(!manager.latestSnapshotOffset.isPresent) } + @Test + def testEntryForVerification(): Unit = { +val originalEntry = stateManager.verificationStateEntry(producerId, true) +val originalEntryVerificationGuard = originalEntry.verificationGuard() + +def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = { + val entry = stateManager.verificationStateEntry(producerId, false) + assertEquals(originalEntryVerificationGuard, entry.verificationGuard) + assertEquals(entry.verificationGuard, newEntry.verificationGuard) +} + +// If we already have an entry, reuse it. +val updatedEntry = stateManager.verificationStateEntry(producerId, true) +verifyEntry(producerId, updatedEntry) + +// Before we add transactional data, we can't remove the entry. +stateManager.clearVerificationStateEntry(producerId) +verifyEntry(producerId, updatedEntry) + +// Add the transactional data and clear the entry +append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true) +stateManager.clearVerificationStateEntry(producerId) +assertEquals(null, stateManager.verificationStateEntry(producerId, false)) + Review Comment: nit: Empty line could be removed. ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition, } } - def hasOngoingTransaction(producerId: Long): Boolean = { -leaderLogIfLocal.exists(leaderLog => leaderLog.hasOngoingTransaction(producerId)) + // Returns a verificationGuard object if we need to verify. This starts or continues the verification process. Otherwise return null. Review Comment: nit: `verification guard object`? I find `verificationGuard` confusing because we actually return an `Object`. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. + // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on + // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append. + // There are two phases -- the first append to the log and subsequent appends. + // + // 1. First append: Verification starts with creating a verificationGuard, sending a verification request to the transaction coordinator, and + // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction + // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could + // have a sequence of events where we start a transaction verification, have the tran
[GitHub] [kafka] jsancio merged pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
jsancio merged PR #13788: URL: https://github.com/apache/kafka/pull/13788 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
jsancio commented on PR #13788: URL: https://github.com/apache/kafka/pull/13788#issuecomment-1578924954 Merging. Unrelated rest failures: ``` testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15063) Throttle number of active PIDs
Omnia Ibrahim created KAFKA-15063: - Summary: Throttle number of active PIDs Key: KAFKA-15063 URL: https://issues.apache.org/jira/browse/KAFKA-15063 Project: Kafka Issue Type: New Feature Components: core, producer Affects Versions: 3.4.0, 3.2.0, 3.0.0, 3.1.0, 2.8.0, 3.3 Reporter: Omnia Ibrahim {color:#172b4d}Ticket to track to track KIP-936. Since KIP-679 i{color:#172b4d}dempotent{color} {color:#172b4d}producers became the default in Kafka {color:#172b4d}as a result of this all producer instances will be assigned PID. The increase of number of PIDs stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors if it has a high number of producers, rogue or misconfigured client(s).{color} {color}{color} {color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}The broker is still exposed to OOM{color}{color}{color} even after KIP-854 introduced a separated config to expire PID from transaction IDs if there is high number of PID before {color}{{producer.id.expiration.ms}}{color:#172b4d} is exceeded. {color}{color}{color} As a result of this the broker will keep experincing OOM and become offline. The only way to recover from this is to increase the heap. {color:#172b4d}KIP-936 is proposing throttling number of PIDs per KafkaPrincipal {color} {color:#172b4d}See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs] {color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
mumrah commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1219791584 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -475,12 +478,13 @@ void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationCon void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) { Set updatedResources = configsDelta.changes().keySet(); updatedResources.forEach(configResource -> { +String opType = brokerOrTopicOpType(configResource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG); Map props = configsImage.configMapForResource(configResource); if (props.isEmpty()) { -operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState -> +operationConsumer.accept(opType, "Delete configs for " + configResource, migrationState -> Review Comment: Yup, good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1578890756 > @hudeqi I added myself as a reviewer, I may not have time to review this today but will get to it this week. OK,thanks your time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac merged PR #13675: URL: https://github.com/apache/kafka/pull/13675 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
viktorsomogyi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1578862889 @hudeqi I added myself as a reviewer, I may not have time to review this today but will get to it this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13820: MINOR: Move Timer/TimingWheel to server-common
dajac opened a new pull request, #13820: URL: https://github.com/apache/kafka/pull/13820 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
ijuma commented on code in PR #13313: URL: https://github.com/apache/kafka/pull/13313#discussion_r1219692414 ## tests/kafkatest/directory_layout/kafka_path.py: ## @@ -49,6 +49,11 @@ CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar", TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar", TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar" +}, +# TODO remove with KAFKA-14762 Review Comment: Can we explain why we do this in this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
ijuma commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1578803245 @gharris1727 do the system tests pass with this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison closed pull request #3955: KAFKA-5969: Use correct error message when the JSON file is invalid
mimaison closed pull request #3955: KAFKA-5969: Use correct error message when the JSON file is invalid URL: https://github.com/apache/kafka/pull/3955 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #3955: KAFKA-5969: Use correct error message when the JSON file is invalid
mimaison commented on PR #3955: URL: https://github.com/apache/kafka/pull/3955#issuecomment-1578794742 `PreferredReplicaLeaderElectionCommand` does not exist anymore and `LeaderElectionCommand` does not have this issue so closing this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
C0urante merged PR #13771: URL: https://github.com/apache/kafka/pull/13771 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult
C0urante commented on PR #13771: URL: https://github.com/apache/kafka/pull/13771#issuecomment-1578788384 Test failures appear unrelated (there is a known bug in the reflections library that we use that caused a test failure which may appear related, but is not). Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jeffkbkim commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1219669199 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +/** + * A simple interface to write records to Partitions/Logs. It contains the minimum + * required for coordinators. + * + * @param The record type. + */ +public interface PartitionWriter { + +/** + * Serializer to translate T to bytes. + * + * @param The record type. + */ +interface Serializer { +/** + * Serializes the key of the record. + */ +byte[] serializeKey(T record); + +/** + * Serializes the value of the record. + */ +byte[] serializeValue(T record); +} + +/** + * Listener allowing to listen to high watermark changes. This is meant + * to be used in conjunction with {{@link PartitionWriter#append(TopicPartition, List)}}. Review Comment: ah found HighWatermarkListener. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1219648522 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } +@Test Review Comment: All these new tests seem to be specific to the `ConnectorOffsetBackingStore` class - should we move them to a new `ConnectorOffsetBackingStoreTest` class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1219596953 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } Review Comment: This `ConnectorOffsetBackingStore::set` method's Javadoc also needs to be updated to mention the special case handling for batches with `null` offsets since it currently states the following: ``` * If configured to use a connector-specific offset store, the returned {@link Future} corresponds to a * write to that store, and the passed-in {@link Callback} is invoked once that write completes. If a worker-global * store is provided, a secondary write is made to that store if the write to the connector-specific store * succeeds. Errors with this secondary write are not reflected in the returned {@link Future} or the passed-in * {@link Callback}; they are only logged as a warning to users. ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); Review Comment: nit: can be simplified ```suggestion boolean containsTombstones = values.containsValue(null); ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); + +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +secondaryStore.set(values, (secondaryWriteError, ignored) -> { +try (LoggingContext context = loggingContext()) { +if (secondaryWriteError != null) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); +} else { +log.debug("Successfully flushed tombstone offsets to secondary backing store"); +} +} +}); +} + return primaryStore.set(values, (primaryWriteError, ignored) -> { -if (secondaryStore != null) { +// Secondary store writes have already happened for tombstone records Review Comment: How do we know this if we aren't blocking on the write to the secondary store above? I believe we should do a synchronous write to the secondary store in this tombstone offset case. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -302,7 +326,12 @@ public Future set(Map values, Callback callb } } try (LoggingContext context = loggingContext()) { -callback.onCompletion(primaryWriteError, ignored); +Throwable secondaryWriteError = secondaryStoreTombstoneWriteError.get(); +if (secondaryStore != null && containsTombstones && secondaryWriteError != null) { Review Comment: Same as above - we aren't blocking on the write to the secondary store, so we can't be sure that it has completed at this point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode
showuon commented on code in PR #13807: URL: https://github.com/apache/kafka/pull/13807#discussion_r1219562194 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3701,6 +3721,11 @@ class ReplicaManagerTest { assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + if (enableRemoteStorage) { Review Comment: Good catch! I thought I've covered every KRaft test cases. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito
clolov commented on PR #13711: URL: https://github.com/apache/kafka/pull/13711#issuecomment-1578656781 Heya @cadonna, I hope I have addressed all of your comments: * Updated the PR's overview * Rebased * Comments on the code itself Interestingly enough the test `shouldInitializeNewStandbyTasks` fails locally when I run all tests, but succeeds when it is ran in isolation. If the build passes I will blame it on something in my environment, but if it fails I would be glad for another pair of eyes as to what might be causing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
viktorsomogyi commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1578650158 I had a short chat with @urbandan yesterday to understand the scope of this fix. During the conversation we came to the conclusion that bumping the epoch without a safety check is generally unsafe as there might be requests in the queue that are timed out with request timeout yet successfully appended on the broker and still wait in the queue in the producer for a retry while their epoch is being bumped. While this PR fixes once case, It would be good to review all usages where the epoch is being bumped. I don't insist on redoing all the other cases, so let me know Daniel if you want to expand the scope of this or do it in a follow up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -if (!partitionsToReset.isEmpty()) { -log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", -connName, partitionsToReset); -DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, -deleteConsumerGroupOffsetsOptions); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); +// This should only occur for an offset reset request when: +// 1. There was a prior attempt to reset offsets +// OR +// 2. No offsets have been committed yet +if (offsetsToWrite.isEmpty()) { Review Comment: I'm wondering whether we should go ah
[jira] [Commented] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances
[ https://issues.apache.org/jira/browse/KAFKA-15059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729709#comment-17729709 ] Chris Egerton commented on KAFKA-15059: --- On second thought, it may be unnecessary to check for a pending rebalance at all. If the worker that we forward the zombie fencing request to is a zombie leader (i.e., a worker that believes it is the leader but in reality is not), it will fail to finish the round of zombie fencing because it won't be able to write to the config topic with a transactional producer. If the connector has just been deleted, we'll still fail the request since we force a read-to-end of the config topic and refresh our snapshot of its contents before checking to see if the connector exists. And regardless, the worker that owns the task will still do a read-to-end of the config topic and verify that (1) no new task configs have been generated for the connector and (2) the worker is still assigned the connector, before allowing the task to process any data. > Exactly-once source tasks fail to start during pending rebalances > - > > Key: KAFKA-15059 > URL: https://issues.apache.org/jira/browse/KAFKA-15059 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > When asked to perform a round of zombie fencing, the distributed herder will > [reject the > request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250] > if a rebalance is pending, which can happen if (among other things) a config > for a new connector or a new set of task configs has been recently read from > the config topic. > Normally this can be alleviated with a simple task restart, which isn't great > but isn't terrible. > However, when running MirrorMaker 2 in dedicated mode, there is no API to > restart failed tasks, and it can be more common to see this kind of failure > on a fresh cluster because three connector configurations are written in > rapid succession to the config topic. > > In order to provide a better experience for users of both vanilla Kafka > Connect and dedicated MirrorMaker 2 clusters, we can retry (likely with the > same exponential backoff introduced with KAFKA-14732) zombie fencing attempts > that fail due to a pending rebalance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on PR #13818: URL: https://github.com/apache/kafka/pull/13818#issuecomment-1578630274 Sink connector offsets alter requests: ``` ConnectorsResource::alterConnectorOffsets -> AbstractHerder::alterConnectorOffsets -> (Distributed|Standalone)Herder::modifyConnectorOffsets -> Worker::alterConnectorOffsets -> Worker::modifySinkConnectorOffsets -> Worker::alterSinkConnectorOffset ``` Source connector offsets alter requests: ``` ConnectorsResource::alterConnectorOffsets -> AbstractHerder::alterConnectorOffsets -> (Distributed|Standalone)Herder::modifyConnectorOffsets -> Worker::alterConnectorOffsets -> Worker::modifySourceConnectorOffsets ``` Sink connector offsets reset requests: ``` ConnectorsResource::resetConnectorOffsets -> AbstractHerder::resetConnectorOffsets -> (Distributed|Standalone)Herder::modifyConnectorOffsets -> Worker::resetConnectorOffsets -> Worker::modifySinkConnectorOffsets -> Worker::resetSinkConnectorOffsets ``` Source connector offsets reset requests: ``` ConnectorsResource::resetConnectorOffsets -> AbstractHerder::resetConnectorOffsets -> (Distributed|Standalone)Herder::modifyConnectorOffsets -> Worker::resetConnectorOffsets -> Worker::modifySourceConnectorOffsets ``` The current flows for altering and resetting offsets along with the use of `null` offsets in multiple places to distinguish between alter and reset offsets requests might seem a little clunky (especially for sink connectors), but I've tried to optimize for code re-use in both the herder implementations as well as the worker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya opened a new pull request, #13818: KAFKA-14784: Connect offset reset REST API
yashmayya opened a new pull request, #13818: URL: https://github.com/apache/kafka/pull/13818 - https://issues.apache.org/jira/browse/KAFKA-14784 - [KIP-875: First-class offsets support in Kafka Connect](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect) - Implements the new `DELETE /connectors/{connector}/offsets` REST API ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada reassigned KAFKA-14445: Assignee: Haruki Okada > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the > broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > * > ** > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > to address the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729695#comment-17729695 ] Haruki Okada commented on KAFKA-14445: -- [~kirktrue] Thanks for your patch about https://issues.apache.org/jira/browse/KAFKA-14317 . However if I read the patch correctly, I guess our original issue should be addressed separately. Our issue was the case where the producer receives REQUEST_TIMED_OUT response (i.e. request timed out inside the purgatory while waiting replication), rather than NetworkClient-level timeout. So I think the || clause here ([https://github.com/apache/kafka/pull/12813#discussion_r1048223644]) was necessary against the discussion. Though this is kind of extreme edge case, I would like to solve anyways as it caused a batch expiration on our producer. I'll submit a follow-up patch. > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the > broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > * > ** > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > to address the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] urbandan commented on a diff in pull request #13813: KAFKA-13756: Connect validate endpoint should return proper validatio…
urbandan commented on code in PR #13813: URL: https://github.com/apache/kafka/pull/13813#discussion_r1219366831 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -464,10 +465,22 @@ ConfigInfos validateConnectorConfig(Map connectorProps, boolean connectorProps = worker.configTransformer().transform(connectorProps); } String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); -if (connType == null) -throw new BadRequestException("Connector config " + connectorProps + " contains no connector type"); +if (connType == null) { +return createConnectorClassError("Config contains no connector type"); +} + +Connector connector; +try { +connector = getConnector(connType); +} catch (ConnectException e) { +return createConnectorClassError(e.getMessage()); Review Comment: I agree. I was tempted to just remove the list of connectors as a whole, but there are other call sites where the exception message is logged. In case it can be useful for diagnostics, I added a parameter instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Vaibhav-Nazare opened a new pull request, #13817: KAFKA-15062: Adding ppc64le build stage
Vaibhav-Nazare opened a new pull request, #13817: URL: https://github.com/apache/kafka/pull/13817 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15062) Power(ppc64le) support for Kafka
Vaibhav created KAFKA-15062: --- Summary: Power(ppc64le) support for Kafka Key: KAFKA-15062 URL: https://issues.apache.org/jira/browse/KAFKA-15062 Project: Kafka Issue Type: Task Components: build Reporter: Vaibhav Support for Power architecture (ppc64le) for apache kafka. What is IBM Power architecture? It is a RISC architecture and IBM has recently made its ISA (Instruction Set Architecture) opensource and in doing so, they have significantly contributed back to the opensource community at large. Many of the pioneers of banking and HPC industries today run on ppc64le architecture. As an ongoing effort to enable open-source projects where Power architecture can add value, we are trying to enable kafka on Power. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients
lucasbru commented on code in PR #13811: URL: https://github.com/apache/kafka/pull/13811#discussion_r1219320732 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1661,6 +1663,10 @@ public void handleResponse(AbstractResponse response) { || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // If the topic is unknown or the coordinator is loading, retry with the current coordinator continue; +} else if (error == Errors.INVALID_PRODUCER_EPOCH Review Comment: Hmm, not sure. There shouldn't be any functional difference, so I moved it back down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients
lucasbru commented on PR #13811: URL: https://github.com/apache/kafka/pull/13811#issuecomment-1578303825 > Hey Lucas -- thanks for the PR. Just wanted to confirm -- these changes are in line with what is proposed as part of KIP-691? It looks to me that is the case, but wanted to confirm. Yes, exactly. This is one of the changes to clean up exceptions in preparation for KIP-691 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1578277690 Hello, are you free to help review this PR? @mimaison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode
satishd commented on code in PR #13807: URL: https://github.com/apache/kafka/pull/13807#discussion_r1219258912 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3701,6 +3721,11 @@ class ReplicaManagerTest { assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + if (enableRemoteStorage) { Review Comment: Do you also want to add similar coverage for both the below tests? ``` testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath testReplicasAreStoppedWhileInControlledShutdownWithKRaft ``` ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3677,20 +3682,35 @@ class ReplicaManagerTest { assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) } - @Test - def testDeltaFromLeaderToFollower(): Unit = { + private def verifyRLMonLeadershipChange(leaderPartitions: util.Set[Partition], followerPartitions: util.Set[Partition]): Unit = { Review Comment: nit: `verifyRLMonLeadershipChange` to `verifyRLMOnLeadershipChange` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down
mimaison merged PR #13473: URL: https://github.com/apache/kafka/pull/13473 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org