[GitHub] [kafka] gharris1727 commented on pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery

2023-06-02 Thread via GitHub


gharris1727 commented on PR #13467:
URL: https://github.com/apache/kafka/pull/13467#issuecomment-1574679783

   @dajac Thanks for pointing this out. This PR is certainly the cause, and it 
appears to be some tests which didn't get updated to compensate for this new 
behavior.
   
   I've opened a quick fix here: https://github.com/apache/kafka/pull/13805
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13805: KAFKA-14863: Fix failing tests for invalid plugins that are no longer visible

2023-06-02 Thread via GitHub


gharris1727 opened a new pull request, #13805:
URL: https://github.com/apache/kafka/pull/13805

   1. DelegatingClassLoaderTest asserts that all of the "includeByDefault" 
plugins are visible, so update TestPlugins' AlwaysThrowException to not be 
included by default.
   2. ConnectorPluginsResourceTest referenced the abstract TimestampConverter 
instead of the concrete TimestampConverter.Key or .Value
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery

2023-06-02 Thread via GitHub


dajac commented on PR #13467:
URL: https://github.com/apache/kafka/pull/13467#issuecomment-1574652673

   @gharris1727 @C0urante Recent builds have failure related to plugins 
loading. Example: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13804/1/tests.
 Could they be related to this PR? It seems that the last
   build of this PR also have them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-02 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1215084272


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -609,14 +686,15 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 }
 
 public synchronized void transitionToUninitialized(RuntimeException 
exception) {
-transitionTo(State.UNINITIALIZED);
+transitionTo(State.UNINITIALIZED, exception, 
InvalidStateDetectionStrategy.BACKGROUND);
 if (pendingTransition != null) {
 pendingTransition.result.fail(exception);
 }
 lastError = null;
 }
 
-public synchronized void maybeTransitionToErrorState(RuntimeException 
exception) {
+public synchronized void maybeTransitionToErrorState(RuntimeException 
exception,

Review Comment:
   Correct. However, I went ahead and made the parameter explicit to avoid 
confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled

2023-06-02 Thread Hao Li (Jira)
Hao Li created KAFKA-15054:
--

 Summary: Add configs and logic to decide if rack aware assignment 
should be enabled
 Key: KAFKA-15054
 URL: https://issues.apache.org/jira/browse/KAFKA-15054
 Project: Kafka
  Issue Type: Sub-task
Reporter: Hao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-02 Thread Bo Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Gao updated KAFKA-15053:
---
Description: 
[This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue introduced 
validations on multiple configs. As a consequence, config {{security.protocol}} 
now only allows upper case values such as PLAINTEXT, SSL, SASL_PLAINTEXT, 
SASL_SSL. Before this change, lower case values like sasl_ssl, ssl are also 
supported, there's even a case insensitive logic inside 
[SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
 to handle the lower case values.

I think we should treat this as a regression bug since we don't support lower 
case values anymore since 3.3.0. For versions later than 3.3.0, we are getting 
error like this when using lower case value sasl_ssl

{{Invalid value sasl_ssl for configuration security.protocol: String must be 
one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}

  was:
This Jira issue introduced validations on multiple configs. As a consequence, 
config {{security.protocol}} now only allows upper case values such as 
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values 
like sasl_ssl, ssl are also supported, there's even a case insensitive logic 
inside 
[SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
 to handle the lower case values.

I think we should treat this as a regression bug since we don't support lower 
case values anymore since 3.3.0. For versions later than 3.3.0, we are getting 
error like this when using lower case value sasl_ssl

{{Invalid value sasl_ssl for configuration security.protocol: String must be 
one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}


> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-02 Thread Bo Gao (Jira)
Bo Gao created KAFKA-15053:
--

 Summary: Regression for security.protocol validation starting from 
3.3.0
 Key: KAFKA-15053
 URL: https://issues.apache.org/jira/browse/KAFKA-15053
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.3.0
Reporter: Bo Gao


This Jira issue introduced validations on multiple configs. As a consequence, 
config {{security.protocol}} now only allows upper case values such as 
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values 
like sasl_ssl, ssl are also supported, there's even a case insensitive logic 
inside 
[SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
 to handle the lower case values.

I think we should treat this as a regression bug since we don't support lower 
case values anymore since 3.3.0. For versions later than 3.3.0, we are getting 
error like this when using lower case value sasl_ssl

{{Invalid value sasl_ssl for configuration security.protocol: String must be 
one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1214957871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,959 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework expose an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new 

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1214930728


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,959 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework expose an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new 

[GitHub] [kafka] dimitarndimitrov opened a new pull request, #13804: KAFKA-15052 Fix the flaky QuorumControllerTest.testBalancePartitionLeaders

2023-06-02 Thread via GitHub


dimitarndimitrov opened a new pull request, #13804:
URL: https://github.com/apache/kafka/pull/13804

   In this test broker session timeout is configured aggressively low (to 1 
second) so that fencing can happen without much waiting. Then in the final 
portion of the test when brokers should not be fenced heartbeats are sent 
roughly 2 times in a session timeout window. However the first time that's done 
there's other code between sending the heartbeat and taking the timestamp, and 
in local tests that code can take up to 0.5 seconds (1/2 of the session 
timeout). That then can result in all brokers being fenced again which would 
fail the test.
   
   This change sends a heartbeat just when a timestamp is taken, which in local 
tests results flaky failures from 4 out of 50 to 0 out of 50.
   - In local tests increasing the session timeout from 1 second to 2 seconds 
also reduced the flaky failures to 0 out of 50 but also consistently increased 
the test running time with 1 second (which seems expected).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-06-02 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728878#comment-17728878
 ] 

Justine Olshan commented on KAFKA-15052:


Thanks for filing this. I've been seeing it often.

> Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
> -
>
> Key: KAFKA-15052
> URL: https://issues.apache.org/jira/browse/KAFKA-15052
> Project: Kafka
>  Issue Type: Test
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
>  as well as in various local runs.
> The test creates a topic, fences a broker, notes partition imbalance due to 
> another broker taking over the partition the fenced broker lost, re-registers 
> and unfences the fenced broker, sends {{AlterPartition}} for the lost 
> partition adding the now unfenced broker back to its ISR, then waits for the 
> partition imbalance to disappear.
> The local failures seem to happen when the brokers (including the ones that 
> never get fenced by the test) accidentally get fenced by losing their session 
> due to reaching the (aggressively low for test purposes) session timeout.
> The Cloudbees failure quoted above also seems to indicate that this happened:
> {code:java}
> ...[truncated 738209 chars]...
> 23. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write 
> event for maybeBalancePartitionLeaders because scheduled (DEFERRED), 
> checkIntervalNs (OptionalLong[10]) and isImbalanced (true) 
> (org.apache.kafka.controller.QuorumController:1401)
> [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-0, foo-1, foo-2 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
> foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 
> 3 -> 4, partitionEpoch: 4 -> 5 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=240, 
> records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
> brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
> prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
> snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:512)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
> maybeFenceReplicas(451616131) will be completed when the log reaches offset 
> 27. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-1 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, 

[jira] [Updated] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-06-02 Thread Dimitar Dimitrov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dimitar Dimitrov updated KAFKA-15052:
-
Description: 
Test failed at 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
 as well as in various local runs.

The test creates a topic, fences a broker, notes partition imbalance due to 
another broker taking over the partition the fenced broker lost, re-registers 
and unfences the fenced broker, sends {{AlterPartition}} for the lost partition 
adding the now unfenced broker back to its ISR, then waits for the partition 
imbalance to disappear.

The local failures seem to happen when the brokers (including the ones that 
never get fenced by the test) accidentally get fenced by losing their session 
due to reaching the (aggressively low for test purposes) session timeout.
The Cloudbees failure quoted above also seems to indicate that this happened:
{code:java}
...[truncated 738209 chars]...
23. (org.apache.kafka.controller.QuorumController:768)
[2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write event 
for maybeBalancePartitionLeaders because scheduled (DEFERRED), checkIntervalNs 
(OptionalLong[10]) and isImbalanced (true) 
(org.apache.kafka.controller.QuorumController:1401)
[2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 because 
its session has timed out. 
(org.apache.kafka.controller.ReplicationControlManager:1459)
[2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
changing partition(s): foo-0, foo-1, foo-2 
(org.apache.kafka.controller.ReplicationControlManager:1750)
[2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 
3, partitionEpoch: 2 -> 3 
(org.apache.kafka.controller.ReplicationControlManager:157)
[2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 3 
-> 4, partitionEpoch: 4 -> 5 
(org.apache.kafka.controller.ReplicationControlManager:157)
[2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 
3, partitionEpoch: 2 -> 3 
(org.apache.kafka.controller.ReplicationControlManager:157)
[2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
appendTimestamp=240, 
records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 
0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 
0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 
0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
[2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
[2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log check. 
(org.apache.kafka.metalog.LocalLogManager:512)
[2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
maybeFenceReplicas(451616131) will be completed when the log reaches offset 27. 
(org.apache.kafka.controller.QuorumController:768)
[2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 because 
its session has timed out. 
(org.apache.kafka.controller.ReplicationControlManager:1459)
[2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
changing partition(s): foo-1 
(org.apache.kafka.controller.ReplicationControlManager:1750)
[2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 -> 
5, partitionEpoch: 5 -> 6 
(org.apache.kafka.controller.ReplicationControlManager:157){code}

  was:
Test failed at 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
 as well as in various local runs.

The test creates a topic, fences a broker, notes partition imbalance due to 
another broker taking over the partition the fenced broker lost, re-registers 
and unfences the fenced broker, sends `AlterPartition` for the lost partition 
adding the now unfenced broker back to its ISR, then waits for the partition 
imbalance to disappear.

The local 

[jira] [Created] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-06-02 Thread Dimitar Dimitrov (Jira)
Dimitar Dimitrov created KAFKA-15052:


 Summary: Fix flaky test 
QuorumControllerTest.testBalancePartitionLeaders()
 Key: KAFKA-15052
 URL: https://issues.apache.org/jira/browse/KAFKA-15052
 Project: Kafka
  Issue Type: Test
Reporter: Dimitar Dimitrov
Assignee: Dimitar Dimitrov


Test failed at 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
 as well as in various local runs.

The test creates a topic, fences a broker, notes partition imbalance due to 
another broker taking over the partition the fenced broker lost, re-registers 
and unfences the fenced broker, sends `AlterPartition` for the lost partition 
adding the now unfenced broker back to its ISR, then waits for the partition 
imbalance to disappear.

The local failures seem to happen when the brokers (including the ones that 
never get fenced by the test) accidentally get fenced by losing their session 
due to reaching the (aggressively low for test purposes) session timeout.
The Cloudbees failure quoted above also seems to indicate that this happened:
{code:java}
...[truncated 738209 chars]...
23. (org.apache.kafka.controller.QuorumController:768)
[2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write event 
for maybeBalancePartitionLeaders because scheduled (DEFERRED), checkIntervalNs 
(OptionalLong[10]) and isImbalanced (true) 
(org.apache.kafka.controller.QuorumController:1401)
[2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 because 
its session has timed out. 
(org.apache.kafka.controller.ReplicationControlManager:1459)
[2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
changing partition(s): foo-0, foo-1, foo-2 
(org.apache.kafka.controller.ReplicationControlManager:1750)
[2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 
3, partitionEpoch: 2 -> 3 
(org.apache.kafka.controller.ReplicationControlManager:157)
[2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 3 
-> 4, partitionEpoch: 4 -> 5 
(org.apache.kafka.controller.ReplicationControlManager:157)
[2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 -> 
3, partitionEpoch: 2 -> 3 
(org.apache.kafka.controller.ReplicationControlManager:157)
[2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
appendTimestamp=240, 
records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 
0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 
0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at version 
0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
[2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
[2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log check. 
(org.apache.kafka.metalog.LocalLogManager:512)
[2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
maybeFenceReplicas(451616131) will be completed when the log reaches offset 27. 
(org.apache.kafka.controller.QuorumController:768)
[2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 because 
its session has timed out. 
(org.apache.kafka.controller.ReplicationControlManager:1459)
[2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
changing partition(s): foo-1 
(org.apache.kafka.controller.ReplicationControlManager:1750)
[2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 -> 
5, partitionEpoch: 5 -> 6 
(org.apache.kafka.controller.ReplicationControlManager:157){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


jeffkbkim commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214829515


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.cluster.PartitionListener
+import kafka.server.ReplicaManager
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
TimestampType}
+import org.apache.kafka.common.record.Record.EMPTY_HEADERS
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.AppendOrigin
+
+import java.nio.ByteBuffer
+import java.util
+import scala.collection.Map
+
+private[group] class ListenerAdaptor(
+  val listener: PartitionWriter.Listener
+) extends PartitionListener {
+  override def onHighWatermarkUpdated(
+tp: TopicPartition,
+offset: Long
+  ): Unit = {
+listener.onHighWatermarkUpdated(tp, offset)
+  }
+
+  override def equals(that: Any): Boolean = that match {
+case other: ListenerAdaptor => listener.equals(other.listener)
+case _ => false
+  }
+
+  override def hashCode(): Int = {
+listener.hashCode()
+  }
+
+  override def toString: String = {
+s"ListenerAdaptor(listener=$listener)"
+  }
+}
+
+class CoordinatorPartitionWriter[T](
+  replicaManager: ReplicaManager,
+  serializer: PartitionWriter.Serializer[T],
+  compressionType: CompressionType,
+  time: Time
+) extends PartitionWriter[T] {
+
+  /**
+   * Register a PartitionWriter.Listener.
+   *
+   * @param tp   The partition to register the listener to.
+   * @param listener The listener.
+   */
+  override def registerListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener))
+  }
+
+  /**
+   * Deregister a PartitionWriter.Listener.
+   *
+   * @param tp   The partition to deregister the listener from.
+   * @param listener The listener.
+   */
+  override def deregisterListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.removeListener(tp, new ListenerAdaptor(listener))
+  }
+
+  /**
+   * Write records to the partitions. Records are written in one batch so
+   * atomicity is guaranteed.
+   *
+   * @param tp  The partition to write records to.
+   * @param records The list of records. The records are written in a single 
batch.
+   * @return The log end offset right after the written records.
+   * @throws KafkaException Any KafkaException caught during the write 
operation.
+   */
+  override def append(
+tp: TopicPartition,
+records: util.List[T]
+  ): Long = {
+if (records.isEmpty) throw new IllegalStateException("records must be 
non-empty.")
+
+replicaManager.getLogConfig(tp) match {
+  case Some(logConfig) =>
+val magic = logConfig.recordVersion.value
+val maxBatchSize = logConfig.maxMessageSize
+val currentTimeMs = time.milliseconds()
+
+val recordsBuilder = MemoryRecords.builder(
+  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+  magic,
+  compressionType,
+  TimestampType.CREATE_TIME,
+  0L,
+  maxBatchSize
+)
+
+records.forEach { record =>
+  val keyBytes = serializer.serializeKey(record)
+  val valueBytes = serializer.serializeValue(record)
+
+  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) recordsBuilder.append(
+currentTimeMs,
+keyBytes,
+valueBytes,
+EMPTY_HEADERS

Review Comment:
   do we not need anything in the header because these are only internal topics?
   
   what metadata do other record types contain in the header, client info?



##

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1214835504


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,959 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework expose an asynchronous, future based, API to the 
world. All the operations

Review Comment:
   nit: exposes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-02 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1214823173


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;

Review Comment:
   ah yeah, that makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-02 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1214821877


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java:
##
@@ -38,13 +35,12 @@ public RocksDBTimeOrderedKeyValueSegmentedBytesStore get() {
 name,
 metricsScope(),
 retentionPeriod,
-Math.max(retentionPeriod / 2, 60_000L),
-withIndex
+Math.max(retentionPeriod / 2, 60_000L)
 );
 }
 
 public String metricsScope() {
-return "rocksdb-session";
+return "rocksdb-buffer";

Review Comment:
   Sure that is fine with me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-02 Thread via GitHub


dajac commented on code in PR #13765:
URL: https://github.com/apache/kafka/pull/13765#discussion_r1214750188


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1091,8 +1091,10 @@ class Partition(val topicPartition: TopicPartition,
   // Note here we are using the "maximal", see explanation above
   val replicaState = replica.stateSnapshot
   if (replicaState.logEndOffsetMetadata.messageOffset < 
newHighWatermark.messageOffset &&
-(replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, 
currentTimeMs, replicaLagTimeMaxMs)
-  || partitionState.maximalIsr.contains(replica.brokerId))) {
+  ((replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, 
currentTimeMs, replicaLagTimeMaxMs) &&
+isReplicaIsrEligible(replica.brokerId)) ||

Review Comment:
   Is it worth extracting this condition into an helper method (e.g. 
isIsrEligibleAndCaughtUp)? That would simplify the condition.



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -1456,6 +1456,105 @@ class PartitionTest extends AbstractPartitionTest {
 assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("fenced", "shutdown", "unfenced"))
+  def testHWMIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit 
= {

Review Comment:
   nit: s/HWM/HighWatermark?



##
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala:
##
@@ -357,6 +363,51 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendToPartitionWithFollowerShutdown(quorum: String): Unit = {

Review Comment:
   nit: `*ShouldNotTimeout`? it would be great to capture the issue in the test 
name or to add a comment about it.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -260,11 +265,17 @@ private void tryElection(PartitionChangeRecord record) {
  * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That 
takes care of
  * case 1. In this function, we check for cases 2 and 3, and handle them 
by manually
  * setting record.leader to the current leader.
+ *
+ * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica 
manager
+ * that required that the leader epoch be bump whenever the ISR shrank. In 
MV 3.6 this leader
+ * bump is not required when the ISR shrinks.
  */
 void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {

Review Comment:
   For my understanding, do we bump the leader epoch when the ISR is expanded? 
My understanding is that we don't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13433: KAFKA-12694, KAFKA-3910: Add cyclic schema support, fix default struct values

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13433:
URL: https://github.com/apache/kafka/pull/13433#discussion_r1214775964


##
connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java:
##
@@ -36,6 +38,255 @@ public class SchemaBuilderTest {
 private static final String DOC = "doc";
 private static final Map NO_PARAMS = null;
 
+@Test
+public void testDefaultValueStructSchema() {
+SchemaBuilder builder = SchemaBuilder.struct()
+.field("f1", Schema.BOOLEAN_SCHEMA);
+
+Struct defaultValue = new Struct(builder.build()); // the Struct 
receives a schema, not a builder
+defaultValue.put("f1", true);
+
+builder.defaultValue(defaultValue)
+.build();
+}
+
+@Test
+public void testDefaultValueStructSchemaBuilder() {
+SchemaBuilder builder = SchemaBuilder.struct()
+.field("f1", Schema.BOOLEAN_SCHEMA);
+
+Struct defaultValue = new Struct(builder);
+defaultValue.put("f1", true);
+
+builder.defaultValue(defaultValue).build();

Review Comment:
   > I think we should stick to one way of setting up struct defaults, so the 
end result of building a schema can be consistent
   
   The more I think of it, the more I agree with this. The inconsistencies 
aren't great, and allowing a default value to be used even though its schema is 
not equal to the schema it's defined on seems like a great chance for a footgun 
that burns people later.
   
   I think we should revert the changes in this PR that cause the 
`SchemaBuilder::testDefaultValueStructSchema` test case to pass and actively 
assert that that test cases throws an exception instead.
   
   We can also add a note to the `SchemaBuilder::defaultValue` Javadocs on how 
to use `Struct` instances as default values.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors

2023-06-02 Thread via GitHub


cmccabe merged PR #13799:
URL: https://github.com/apache/kafka/pull/13799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-02 Thread via GitHub


C0urante commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1574216434

   @divijvaidya No worries about the delay--looks like we're both guilty on 
that front 
   
   I took at look at this and the issue is that the `MirrorCheckpointConnector` 
and `MirrorSourceConnector` instances are generating empty lists of task 
configs, which makes since since the former never generates tasks during these 
tests and the latter only generates tasks once a to-be-mirrored topic on the 
upstream cluster is created, which currently takes place only after our startup 
check has finished.
   
   Correct me if I'm wrong: the idea here is to add granularity to test failure 
messages so that we can differentiate between MM2 startup issues (caused by 
lagginess or connector/task failures) and issues with the actual replication 
logic performed by MM2. If that's the case, do you think it might make sense to 
do the following?
   
   - Alter the"await startup" logic to fail immediately if it detects any 
failed connectors or tasks
   - Alter the "await startup" logic to operate on a per-connector-type basis, 
and selectively await the startup of different connector types at different 
points in the test (e.g., we could await the startup of the heartbeat connector 
in any place where we currently call `waitForMirrorMakersToStart`, but only 
await the startup of the source connector and its tasks once we've created a 
to-be-replicated topic on the upstream cluster)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and CoordinatorPlayback interfaces

2023-06-02 Thread via GitHub


dajac merged PR #13794:
URL: https://github.com/apache/kafka/pull/13794


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13771:
URL: https://github.com/apache/kafka/pull/13771#discussion_r1214691525


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java:
##
@@ -490,6 +504,168 @@ public void testPluginUrlsWithRelativeSymlinkForwards() 
throws Exception {
 assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
 }
 
+@Test
+public void testNonCollidingAliases() {
+SortedSet> sinkConnectors = new TreeSet<>();
+sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+SortedSet> sourceConnectors = new 
TreeSet<>();
+sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+SortedSet> converters = new TreeSet<>();
+converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+PluginScanResult result = new PluginScanResult(
+sinkConnectors,
+sourceConnectors,
+converters,
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet()
+);
+Map aliases = PluginUtils.computeAliases(result);
+assertFalse(aliases.containsKey("Mock"));
+assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSourceConnector"));
+assertEquals(MockSourceConnector.class.getName(), 
aliases.get("MockSource"));
+assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+assertEquals(CollidingConverter.class.getName(), 
aliases.get("Colliding"));
+}
+
+
+@Test
+public void testMultiVersionAlias() {
+SortedSet> sinkConnectors = new TreeSet<>();
+// distinct versions don't cause an alias collision (the class name is 
the same)
+sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
MockSinkConnector.class.getClassLoader()));
+assertEquals(2, sinkConnectors.size());
+PluginScanResult result = new PluginScanResult(
+sinkConnectors,
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet()
+);
+Map aliases = PluginUtils.computeAliases(result);
+assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSinkConnector"));
+assertEquals(MockSinkConnector.class.getName(), 
aliases.get("MockSink"));
+}
+
+@Test
+public void testCollidingPrunedAlias() {
+SortedSet> converters = new TreeSet<>();
+converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+SortedSet> headerConverters = new 
TreeSet<>();
+headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, 
null, CollidingHeaderConverter.class.getClassLoader()));
+PluginScanResult result = new PluginScanResult(
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+converters,
+headerConverters,
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet(),
+Collections.emptySortedSet()
+);
+Map aliases = PluginUtils.computeAliases(result);
+assertEquals(CollidingConverter.class.getName(), 
aliases.get("CollidingConverter"));
+assertEquals(CollidingHeaderConverter.class.getName(), 
aliases.get("CollidingHeaderConverter"));
+assertFalse(aliases.containsKey("Colliding"));

Review Comment:
   Same thought RE comparing whole `Map` objects:
   
   ```suggestion
   Map actualAliases = 
PluginUtils.computeAliases(result);
   
   Map expectedAliases = new HashMap<>();
   expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
   expectedAliases.put("CollidingHeaderConverter", 
CollidingHeaderConverter.class.getName());
   assertEquals(expectedAliases, actualAliases);
   ```



##

[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1214665544


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -832,202 +776,165 @@ private CountDownLatch expectPolls(int minimum, final 
AtomicInteger count) throw
 // Note that we stub these to allow any number of calls because the 
thread will continue to
 // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
 // calls
-EasyMock.expect(sourceTask.poll())
-.andStubAnswer(() -> {
-count.incrementAndGet();
-latch.countDown();
-Thread.sleep(10);
-return RECORDS;
-});
+doAnswer((Answer>) invocation -> {
+count.incrementAndGet();
+latch.countDown();
+Thread.sleep(10);
+return RECORDS;
+}).when(sourceTask).poll();
+
 // Fallout of the poll() call
-expectSendRecordAnyTimes();
+expectSendRecord();
 return latch;
 }
 
 private CountDownLatch expectPolls(int count) throws InterruptedException {
 return expectPolls(count, new AtomicInteger());
 }
 
-@SuppressWarnings("unchecked")
-private void expectSendRecordSyncFailure(Throwable error) {
-expectConvertHeadersAndKeyValue(false);
-expectApplyTransformationChain(false);
-
-EasyMock.expect(
-producer.send(EasyMock.anyObject(ProducerRecord.class),
-
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
-.andThrow(error);
+private void expectSendRecord() throws InterruptedException {
+expectSendRecordTaskCommitRecordSucceed();
 }
 
-private Capture> expectSendRecordAnyTimes() 
throws InterruptedException {
-return expectSendRecordTaskCommitRecordSucceed(true);
+//
+private void expectSendRecordProducerCallbackFail() throws 
InterruptedException {
+expectSendRecord(TOPIC, false, false, true, emptyHeaders());
 }
 
-private Capture> expectSendRecordOnce() 
throws InterruptedException {
-return expectSendRecordTaskCommitRecordSucceed(false);
+private void expectSendRecordTaskCommitRecordSucceed() throws 
InterruptedException {
+expectSendRecord(TOPIC, true, true, true, emptyHeaders());
 }
 
-private Capture> 
expectSendRecordProducerCallbackFail() throws InterruptedException {
-return expectSendRecord(TOPIC, false, false, false, true, 
emptyHeaders());
-}
-
-private Capture> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws 
InterruptedException {
-return expectSendRecord(TOPIC, anyTimes, true, true, true, 
emptyHeaders());
-}
-
-private Capture> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws 
InterruptedException {
-return expectSendRecord(TOPIC, anyTimes, true, false, true, 
emptyHeaders());
-}
-
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-boolean sendSuccess,
-boolean commitSuccess,
-boolean isMockedConverters,
-Headers headers
+private void expectSendRecord(
+String topic,
+boolean sendSuccess,
+boolean commitSuccess,
+boolean isMockedConverters,
+Headers headers
 ) throws InterruptedException {
 if (isMockedConverters) {
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+expectConvertHeadersAndKeyValue(topic, headers);
 }
 
-expectApplyTransformationChain(anyTimes);
-
-Capture> sent = EasyMock.newCapture();
-
-// 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent),
-EasyMock.capture(producerCallbacks)));
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (org.apache.kafka.clients.producer.Callback cb : 
producerCallbacks.getValues()) {
-if (sendSuccess) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0,
-0L, 0, 0), null);
-} else {
-cb.onCompletion(null, new 
TopicAuthorizationException("foo"));
-}
-}
-producerCallbacks.reset();
-}
-return sendFuture;
-};
-if (anyTimes)
-expect.andStubAnswer(expectResponse);
-else
-expect.andAnswer(expectResponse);
+expectApplyTransformationChain();
 
 if (sendSuccess) {
 // 2. As a result of a successful 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors

2023-06-02 Thread via GitHub


cmccabe commented on code in PR #13799:
URL: https://github.com/apache/kafka/pull/13799#discussion_r1214678682


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2024,13 +2029,8 @@ public CompletableFuture 
electLeaders(
 public CompletableFuture finalizedFeatures(
 ControllerRequestContext context
 ) {
-// It's possible that we call ApiVersionRequest before consuming the 
log since ApiVersionRequest is sent when
-// initialize NetworkClient, we should not return an error since it 
would stop the NetworkClient from working correctly.
-if (lastCommittedOffset == -1) {

Review Comment:
   yes, this was previously a bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors

2023-06-02 Thread via GitHub


cmccabe commented on code in PR #13799:
URL: https://github.com/apache/kafka/pull/13799#discussion_r1214677896


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -425,25 +430,24 @@ public void accept(ConfigResource configResource) {
 
 public static final String CONTROLLER_THREAD_SUFFIX = 
"QuorumControllerEventHandler";
 
-private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
-"The active controller appears to be node ";
-
-private NotControllerException newNotControllerException() {
-OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
-if (latestController.isPresent()) {
-return new 
NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
-latestController.getAsInt() + ".");
-} else {
-return new NotControllerException("No controller appears to be 
active.");
-}
+private OptionalInt latestController() {
+return raftClient.leaderAndEpoch().leaderId();
 }
 
-private NotControllerException newPreMigrationException() {
-OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
-if (latestController.isPresent()) {
-return new NotControllerException("The controller is in 
pre-migration mode.");
+/**
+ * @return  The offset that we should perform read operations at.
+ */
+private long currentReadOffset() {
+if (isActiveController()) {
+// The active controller keeps an in-memory snapshot at the last 
committed offset,
+// which we want to read from when performing read operations. 
This will avoid
+// reading uncommitted data.
+return lastCommittedOffset;
 } else {
-return new NotControllerException("No controller appears to be 
active.");
+// Standby controllers never have uncommitted data in memory. 
Therefore, we return
+// Long.MAX_VALUE, a special value which means "always read the 
latest from every
+// data structure."
+return Long.MAX_VALUE;

Review Comment:
   Good point. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-02 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1214669497


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;

Review Comment:
   > unfortunately with how the evictions work that just pushes the complexity 
into the in memory implementation
   
   Which in memory implementation was causing trouble? Currently 
`BufferValue.serialize()` is serializing three extra integers into the value 
which are all unnecessary if this store use case will never populate values for 
either `oldValue` or `priorValue`. Might not be a lot in some use cases but for 
small message sizes the overhead could be meaningful.
   
   That said, we can always address this as a follow-up PR but I am curious to 
know where the challenge lies, since I think it'd be nice to serialize the 
value directly instead of going through `Change` serializers if we can avoid it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-02 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1214669497


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;

Review Comment:
   > unfortunately with how the evictions work that just pushes the complexity 
into the in memory implementation
   
   Which in memory implementation was causing trouble? Currently 
`BufferValue.serialize()` is serializing three extra integers into the value 
which are all unnecessary if this store use case will never populate values for 
either `oldValue` or `priorValue`. Might not be a lot in some use cases but for 
small message sizes the overhead could be meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-02 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r121442


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java:
##
@@ -34,10 +34,9 @@ public class RocksDBTimeOrderedKeyValueSegmentedBytesStore 
extends AbstractRocks
 RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name,
   final String metricsScope,
   final long retention,
-  final long segmentInterval,
-  final boolean withIndex) {
+  final long segmentInterval) {
 super(name, metricsScope, retention, segmentInterval, new 
TimeFirstWindowKeySchema(),
-Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : 
null));
+Optional.ofNullable(null));

Review Comment:
   nit: `Optional.empty()`



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -78,18 +78,21 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 @Override
 public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
-KeyValue keyValue = null;
+KeyValue keyValue;
 
 if (predicate.get()) {
 final KeyValueIterator iterator = wrapped()
 .fetchAll(0, wrapped().observedStreamTime - 
gracePeriod.toMillis());
 if (iterator.hasNext()) {
 keyValue = iterator.next();
-}
-if (keyValue == null) {
-minTimestamp = Long.MAX_VALUE;
+} else {
+if (numRecords() == 0) {
+minTimestamp = Long.MAX_VALUE;
+}
+iterator.close();

Review Comment:
   Use try-with-resource block instead, so we don't need to remember to close 
the iterator in all possible places before returning? Also so that if an 
exception occurs, the iterator is still not leaked.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedKeyValueBytesStoreSupplier.java:
##
@@ -38,13 +35,12 @@ public RocksDBTimeOrderedKeyValueSegmentedBytesStore get() {
 name,
 metricsScope(),
 retentionPeriod,
-Math.max(retentionPeriod / 2, 60_000L),
-withIndex
+Math.max(retentionPeriod / 2, 60_000L)
 );
 }
 
 public String metricsScope() {
-return "rocksdb-session";
+return "rocksdb-buffer";

Review Comment:
   Do we want this store to specifically be used as a buffer, or should it 
support non-buffer use cases for RocksDB time-ordered stores as well? If we 
want it to be more extensible in the future, we can simply use `rocksdb` here 
(similar to `RocksDbKeyValueBytesStoreSupplier`). We'd also want to remove the 
`-buffer` suffix from the store name too.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import 

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214664368


##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify 
the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This 
prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is 
used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction 
or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer 
state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+private final long producerId;
+private long timestamp;
+private Object verificationState;

Review Comment:
   I've changed to verificationGuard



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1214654703


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -391,37 +369,26 @@ public void testFailureInPoll() throws Exception {
 taskFuture.get();
 assertPollMetrics(0);
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyOffsetFlush(true);
+verify(statusListener).onFailure(taskId, exception);
+verify(sourceTask).stop();

Review Comment:
   Ah, never mind. This is covered by changes to `verifyOffsetFlush`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1214602836


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -315,157 +322,130 @@ public void testPause() throws Exception {
 
 taskFuture.get();
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyTaskGetTopic(count.get());
+verifyOffsetFlush(true);
+verifyTopicCreation(TOPIC);
+verify(statusListener).onPause(taskId);
+verify(statusListener).onShutdown(taskId);
+verify(sourceTask).stop();
+verify(offsetWriter).offset(PARTITION, OFFSET);
+verifyClose();
 }
 
 @Test
 public void testPollsInBackground() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = expectPolls(10);
-// In this test, we don't flush, so nothing goes any further than the 
offset writer
 
 expectTopicCreation(TOPIC);
-
-sourceTask.stop();
-EasyMock.expectLastCall();
-
-offsetWriter.offset(PARTITION, OFFSET);
-PowerMock.expectLastCall();
-expectOffsetFlush(true);
-
-statusListener.onShutdown(taskId);
-EasyMock.expectLastCall();
-
-expectClose();
-
-PowerMock.replayAll();
+expectOffsetFlush();
 
 workerTask.initialize(TASK_CONFIG);
 Future taskFuture = executor.submit(workerTask);
 
-assertTrue(awaitLatch(pollLatch));
+ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG);
 workerTask.stop();
 assertTrue(workerTask.awaitStop(1000));
 
 taskFuture.get();
 assertPollMetrics(10);
-
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyOffsetFlush(true);
+verify(offsetWriter).offset(PARTITION, OFFSET);
+verify(statusListener).onShutdown(taskId);
+verifyClose();
 }
 
 @Test
 public void testFailureInPoll() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = new CountDownLatch(1);
 final RuntimeException exception = new RuntimeException();
-EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+when(sourceTask.poll()).thenAnswer(invocation -> {
 pollLatch.countDown();
 throw exception;
 });
 
-statusListener.onFailure(taskId, exception);
-EasyMock.expectLastCall();
-
-sourceTask.stop();
-EasyMock.expectLastCall();
 expectEmptyOffsetFlush();
 
-expectClose();
-
-PowerMock.replayAll();
-
 workerTask.initialize(TASK_CONFIG);
 Future taskFuture = executor.submit(workerTask);
 
-assertTrue(awaitLatch(pollLatch));
+ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG);
 //Failure in poll should trigger automatic stop of the task
 assertTrue(workerTask.awaitStop(1000));
-assertShouldSkipCommit();
 
 taskFuture.get();
 assertPollMetrics(0);
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verify(statusListener).onFailure(taskId, exception);
+verify(sourceTask).stop();
+assertShouldSkipCommit();
+verifyOffsetFlush(true);
+verifyClose();
 }
 
 @Test
 public void testFailureInPollAfterCancel() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = new CountDownLatch(1);
 final CountDownLatch workerCancelLatch = new CountDownLatch(1);
 final RuntimeException exception = new RuntimeException();
-EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+when(sourceTask.poll()).thenAnswer(invocation -> {
 pollLatch.countDown();
-assertTrue(awaitLatch(workerCancelLatch));
+ConcurrencyUtils.awaitLatch(workerCancelLatch, POLL_TIMEOUT_MSG);
 throw exception;
 });
 
-offsetReader.close();
-PowerMock.expectLastCall();
-
-producer.close(Duration.ZERO);
-PowerMock.expectLastCall();
-
-sourceTask.stop();
-EasyMock.expectLastCall();
-
-expectClose();
-
-PowerMock.replayAll();
-
 workerTask.initialize(TASK_CONFIG);
 Future taskFuture = executor.submit(workerTask);
 
-assertTrue(awaitLatch(pollLatch));
+ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG);
 workerTask.cancel();
 workerCancelLatch.countDown();
 assertTrue(workerTask.awaitStop(1000));
 
 taskFuture.get();
 assertPollMetrics(0);
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verify(offsetReader, atLeastOnce()).close();
+verify(producer).close(Duration.ZERO);
+verify(sourceTask).stop();
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214639262


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
   val sTime = time.milliseconds
   
   val transactionalProducerIds = mutable.HashSet[Long]()
+  var verificationState: Object = null

Review Comment:
   I've fixed this and added a test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-06-02 Thread via GitHub


gharris1727 commented on PR #13771:
URL: https://github.com/apache/kafka/pull/13771#issuecomment-1574069538

   Here's an example of the new log message, pulled from the test run:
   ```
   [2023-06-02 10:15:57,932] WARN Ambiguous alias 'Colliding' refers to 
multiple distinct plugins 
[org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter,
 
org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter]: 
ignoring (org.apache.kafka.connect.runtime.isolation.PluginUtils:378)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo opened a new pull request, #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-02 Thread via GitHub


jeqo opened a new pull request, #13803:
URL: https://github.com/apache/kafka/pull/13803

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15051) docs: add missing connector plugin endpoint to documentation

2023-06-02 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15051:


 Summary: docs: add missing connector plugin endpoint to 
documentation
 Key: KAFKA-15051
 URL: https://issues.apache.org/jira/browse/KAFKA-15051
 Project: Kafka
  Issue Type: Task
  Components: docs, documentation
Reporter: Jorge Esteban Quilcate Otoya


GET /plugin/config endpoint added in 
[KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions]
 is not included in the connect documentation page: 
https://kafka.apache.org/documentation/#connect_rest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-02 Thread via GitHub


divijvaidya commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1214582937


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched {
+
+private final Map recordsToDelete;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public DeleteRecordsHandler(
+Map recordsToDelete,
+LogContext logContext
+) {
+this.recordsToDelete = recordsToDelete;
+this.log = logContext.logger(DeleteRecordsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "deleteRecords";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+public static SimpleAdminApiFuture 
newFuture(
+Collection topicPartitions
+) {
+return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+}
+
+@Override
+public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map 
deletionsForTopic = new HashMap<>();
+for (Map.Entry entry: 
recordsToDelete.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.get(topicPartition.topic());
+if (deleteRecords == null) {
+deleteRecords = new 
DeleteRecordsRequestData.DeleteRecordsTopic()
+.setName(topicPartition.topic());
+deletionsForTopic.put(topicPartition.topic(), deleteRecords);
+}
+deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
+.setPartitionIndex(topicPartition.partition())
+.setOffset(entry.getValue().beforeOffset()));
+
+System.out.println("Partitions: " + deleteRecords.partitions());
+}
+
+DeleteRecordsRequestData data = new DeleteRecordsRequestData()
+.setTopics(new ArrayList<>(deletionsForTopic.values()));
+return new DeleteRecordsRequest.Builder(data);
+}
+
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+DeleteRecordsResponse response = (DeleteRecordsResponse) 
abstractResponse;
+Map completed = new HashMap<>();
+Map failed = new HashMap<>();
+List unmapped = new ArrayList<>();
+Set retriable = new HashSet<>();
+
+for (DeleteRecordsResponseData.DeleteRecordsTopicResult topicResult: 
response.data().topics()) {
+for (DeleteRecordsResponseData.DeleteRecordsPartitionResult 
partitionResult : topicResult.partitions()) {
+Errors error = 

[GitHub] [kafka] C0urante commented on a diff in pull request #13750: MINOR: Handle the config topic read timeout edge case in DistributedHerder's stopConnector method

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1214589790


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final 
Callback callback)
 writeTaskConfigs(connName, Collections.emptyList());
 configBackingStore.putTargetState(connName, 
TargetState.STOPPED);
 // Force a read of the new target state for the connector
-refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+throw new ConnectException("Failed to read to end of 
config topic");

Review Comment:
   Agreed, better to log a warning 
   
   I was also toying with the idea that we could shift the responsibility of 
refreshing the view of the config topic from operations that write to the 
topic, to operations that read from the topic. For example, if we need to check 
that a connector exists before pausing/resuming/restarting it, then that 
operation fails if we can't update our view of the topic.
   
   I don't think this is necessary to do in this PR and can be a follow-up. 
It's also questionable if we should even bother since there's still race 
conditions that can come from target state updates that originate from 
non-leader workers.
   
   But it's at least worth some food for thought.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


dajac commented on PR #13675:
URL: https://github.com/apache/kafka/pull/13675#issuecomment-1574019307

   Yeah, I have noticed all the failed tests. I will fix them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-02 Thread via GitHub


philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1574015844

   The failures seem unrelated
   ```
   Build / JDK 8 and Scala 2.12 / 
testDescribeReportOverriddenConfigs(String).quorum=kraft – 
kafka.admin.TopicCommandIntegrationTest
   10s
   Build / JDK 8 and Scala 2.12 / testDelayedConfigurationOperations() – 
org.apache.kafka.controller.QuorumControllerTest
   <1s
   Build / JDK 11 and Scala 2.13 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   1m 36s
   Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   12s
   Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   13s
   Fixed 68
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


jolshan commented on PR #13675:
URL: https://github.com/apache/kafka/pull/13675#issuecomment-1574008136

   Hey David -- whenever we update appendRecords we have to modify a ton of 
tests that mock the method. (Basically add a ton of any(), since the mocker 
doesn't get default args). There are 283 failing now  
   
   See here for an example of a PR that updates them: 
https://github.com/apache/kafka/pull/13391/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214570874


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Append the messages to the local replica logs
+   * Append the messages to the local replica logs. 
ReplicaManager#appendRecords should usually be
+   * used instead of this method.

Review Comment:
   lgtm :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214569080


##
core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala:
##
@@ -131,26 +132,22 @@ class PartitionWriterImpl[T](
 s"in append to partition $tp which exceeds the maximum configured 
size of $maxBatchSize.")
 }
 
-val appendResults = replicaManager.appendToLocalLog(
+var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
+replicaManager.appendRecords(
+  timeout = 0L,
+  requiredAcks = 1,
   internalTopicsAllowed = true,
   origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> recordsBuilder.build()),
-  requiredAcks = 1,
-  requestLocal = RequestLocal.NoCaching
+  responseCallback = results => appendResults = results,
+  actionQueueAdd = item => item() // Immediately complete the action 
queue item.
 )
 
 val partitionResult = appendResults.getOrElse(tp,
-  throw new IllegalStateException("Append status %s should only have 
one partition %s"
-.format(appendResults, tp)))
-
-// Complete delayed operations.
-replicaManager.maybeCompletePurgatories(
-  tp,
-  partitionResult.info.leaderHwChange
-)
+  throw new IllegalStateException(s"Append status $appendResults 
should only have one partition $tp"))

Review Comment:
   nit: this error message confused me because I thought that it was saying 
there was more than one tp and that's why it failed. I think maybe the message 
should say something like s"Append status $appendResults should have partition 
$tp" since the fact that it doesn't would be the reason to throw the error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors

2023-06-02 Thread via GitHub


mumrah commented on code in PR #13799:
URL: https://github.com/apache/kafka/pull/13799#discussion_r1214563481


##
metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java:
##
@@ -38,4 +47,104 @@ public static boolean isTimeoutException(Throwable 
exception) {
 if (!(exception instanceof TimeoutException)) return false;
 return true;
 }
+
+/**
+ * Check if an exception is a NotController exception.
+ *
+ * @param exception The exception to check.
+ * @return  True if the exception is a NotController exception.
+ */
+public static boolean isNotControllerException(Throwable exception) {
+if (exception == null) return false;
+if (exception instanceof ExecutionException) {
+exception = exception.getCause();
+if (exception == null) return false;
+}
+if (!(exception instanceof NotControllerException)) return false;
+return true;
+}
+
+/**
+ * Create a new exception indicating that the controller is in 
pre-migration mode, so the
+ * operation cannot be completed.
+ *
+ * @param latestController  The current controller.
+ * @return  The new NotControllerException.
+ */
+public static NotControllerException newPreMigrationException(OptionalInt 
latestController) {

Review Comment:
   I'm guessing the argument here is the controller ID? Can you rename it to 
reflect that? (same for other methods in this class)



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2024,13 +2029,8 @@ public CompletableFuture 
electLeaders(
 public CompletableFuture finalizedFeatures(
 ControllerRequestContext context
 ) {
-// It's possible that we call ApiVersionRequest before consuming the 
log since ApiVersionRequest is sent when
-// initialize NetworkClient, we should not return an error since it 
would stop the NetworkClient from working correctly.
-if (lastCommittedOffset == -1) {

Review Comment:
   Was this a bug previously? If another node sent an ApiVersionRequest prior 
to loading the log, we would have not returned any MetadataVersion (whereas 
with this patch, we would). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-02 Thread via GitHub


jsancio commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573978281

   > 1. We change leader epoch when a replica is removed and shrinks the 
ISR (without a leader-re-election). Is that correct? Is yes, then should we 
also removing the logic to increment the epoch in such situations to keep the 
definition of leader epoch consistent?
   
   Yes. That is what the change to `PartittionChangeBuilder` does. Can you 
point me to what code exactly are you referring to?
   
   > 2. Is a similar change required for Zk code path?
   
   We need to keep the old behavior in ZK deployments because the ZK controller 
doesn't implement KIP-841 which is required for this fix to work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214564305


##
core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.cluster.PartitionListener
+import kafka.server.{ReplicaManager, RequestLocal}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
TimestampType}
+import org.apache.kafka.common.record.Record.EMPTY_HEADERS
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.AppendOrigin
+
+import java.nio.ByteBuffer
+import java.util
+import scala.collection.Map
+
+private[group] class ListenerAdaptor(
+  val listener: PartitionWriter.Listener
+) extends PartitionListener {
+  override def onHighWatermarkUpdated(
+tp: TopicPartition,
+offset: Long
+  ): Unit = {
+listener.onHighWatermarkUpdated(tp, offset)
+  }
+
+  override def equals(that: Any): Boolean = that match {
+case other: ListenerAdaptor => listener.equals(other.listener)
+case _ => false
+  }
+
+  override def hashCode(): Int = {
+listener.hashCode()
+  }
+
+  override def toString: String = {
+s"ListenerAdaptor(listener=$listener)"
+  }
+}
+
+class PartitionWriterImpl[T](
+  replicaManager: ReplicaManager,
+  serializer: PartitionWriter.Serializer[T],
+  compressionType: CompressionType,
+  time: Time
+) extends PartitionWriter[T] {
+
+  override def registerListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def deregisterListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.removeListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def append(
+tp: TopicPartition,
+records: util.List[T]
+  ): Long = {
+if (records.isEmpty) {
+  throw new IllegalStateException("records must be non-empty.")
+}
+
+replicaManager.getLogConfig(tp) match {
+  case Some(logConfig) =>
+val magic = logConfig.recordVersion.value
+val maxBatchSize = logConfig.maxMessageSize
+val currentTimeMs = time.milliseconds()
+
+val recordsBuilder = MemoryRecords.builder(
+  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+  magic,
+  compressionType,
+  TimestampType.CREATE_TIME,
+  0L,
+  maxBatchSize
+)
+
+records.forEach { record =>
+  val keyBytes = serializer.serializeKey(record)
+  val valueBytes = serializer.serializeValue(record)
+
+  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) {
+recordsBuilder.append(
+  currentTimeMs,
+  keyBytes,
+  valueBytes,
+  EMPTY_HEADERS
+)
+  } else {
+throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
+  s"in append to partition $tp which exceeds the maximum 
configured size of $maxBatchSize.")
+  }
+}
+
+val appendResults = replicaManager.appendToLocalLog(

Review Comment:
   Sounds good. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13530: KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode

2023-06-02 Thread via GitHub


C0urante commented on PR #13530:
URL: https://github.com/apache/kafka/pull/13530#issuecomment-1573973942

   Hmm... so I just did some local testing with this change and the results 
seem a little inconsistent.
   
   Although every expected case reported the error successfully (via the REST 
API if the `Connector::taskConfigs` method was called synchronously, and via 
worker logs no matter what), we also don't mark the connector failed in the 
REST API.
   
   Do you think it's worth it to mark connectors FAILED in the REST API if 
we've encountered an error while interacting with them, and will not 
(automatically) retry that interaction?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214561645


##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify 
the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This 
prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is 
used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction 
or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer 
state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+private final long producerId;

Review Comment:
   We can remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214561227


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -184,6 +186,27 @@ private void clearProducerIds() {
 producerIdCount = 0;
 }
 
+/**
+ * Maybe create the VerificationStateEntry for a given producer ID. Return 
it if it exists, otherwise return null.
+ */
+public VerificationStateEntry verificationStateEntry(long producerId, 
boolean createIfAbsent) {
+return verificationStates.computeIfAbsent(producerId, pid -> {
+if (createIfAbsent)
+return new VerificationStateEntry(pid, time.milliseconds());
+else {
+log.warn("The given producer ID did not have an entry in the 
producer state manager, so it's state will be returned as null");

Review Comment:
   Yes. But in this case, we will fail the verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-02 Thread via GitHub


mumrah opened a new pull request, #13802:
URL: https://github.com/apache/kafka/pull/13802

   This patch adds complete test coverage for the snapshot methods in 
KRaftMigrationZkWriter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-02 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214559856


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -683,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   validateAndAssignOffsets = false,
   leaderEpoch = -1,
   requestLocal = None,
+  verificationState = null,

Review Comment:
   Nope. We don't verify followers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-02 Thread via GitHub


divijvaidya commented on PR #13561:
URL: https://github.com/apache/kafka/pull/13561#issuecomment-1573968832

   Hey @satishd 
   Are you planning to address the open comments such as 
https://github.com/apache/kafka/pull/13561/files#r1166767890 before I do 
another pass of code review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-02 Thread via GitHub


divijvaidya commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573957094

   > the old code was increasing the leader epoch when with the ISR shrinks but 
not when the ISR expands
   
   Thank you, I didn't realise that. 
   
   Next,
   1. We change leader epoch when a replica is removed and shrinks the ISR 
(without a leader-re-election). Is that correct? Is yes, then should we also 
removing the logic to increment the epoch in such situations to keep the 
definition of leader epoch consistent? 
   
   2. Is a similar change required for Zk code path?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abbccdda closed pull request #9624: KAFKA-10655: wrap and catch exception for appendAsLeader failure

2023-06-02 Thread via GitHub


abbccdda closed pull request #9624: KAFKA-10655: wrap and catch exception for 
appendAsLeader failure
URL: https://github.com/apache/kafka/pull/9624


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-06-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15012.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
>   at 
> org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> 

[GitHub] [kafka] C0urante merged pull request #13800: KAFKA-15012: Allow leading zeros in numeric fields while deserializing JSON messages using the JsonConverter

2023-06-02 Thread via GitHub


C0urante merged PR #13800:
URL: https://github.com/apache/kafka/pull/13800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-02 Thread via GitHub


splett2 commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573900120

   @jsancio 
   That makes sense. Sounds good to me in that case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14863) Plugins which do not have a valid no-args constructor are visible in the REST API

2023-06-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14863.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Plugins which do not have a valid no-args constructor are visible in the REST 
> API
> -
>
> Key: KAFKA-14863
> URL: https://issues.apache.org/jira/browse/KAFKA-14863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.6.0
>
>
> Currently, the Connect plugin discovery mechanisms only assert that a no-args 
> constructor is present when necessary. In particular, this assertion happens 
> for Connectors when the framework needs to evaluate the connector's version 
> method.
> It also happens for ConnectorConfigOverridePolicy, ConnectRestExtension, and 
> ConfigProvider plugins, which are loaded via the ServiceLoader. The 
> ServiceLoader constructs instances of plugins with their no-args constructor 
> during discovery, so these plugins are discovered even if they are not 
> Versioned.
> This has the effect that these unusable plugins which are missing a default 
> constructor appear in the REST API, but are not able to be instantiated or 
> used. To make the ServiceLoader and Reflections discovery mechanisms behave 
> more similar, this assertion should be applied to all plugins, and a log 
> message emitted when plugins do not follow the constructor requirements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery

2023-06-02 Thread via GitHub


C0urante merged PR #13467:
URL: https://github.com/apache/kafka/pull/13467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15019) Improve handling of broker heartbeat timeouts

2023-06-02 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15019.

Fix Version/s: 3.5.0
   Resolution: Fixed

> Improve handling of broker heartbeat timeouts
> -
>
> Key: KAFKA-15019
> URL: https://issues.apache.org/jira/browse/KAFKA-15019
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0
>
>
> Improve handling of overload situations in the KRaft controller



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15004) Topic config changes are not synced during zk to kraft migration (dual-write)

2023-06-02 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-15004:
--

Fix Version/s: 3.5.0
 Assignee: Akhilesh Chaganti
   Resolution: Fixed

> Topic config changes are not synced during zk to kraft migration (dual-write)
> -
>
> Key: KAFKA-15004
> URL: https://issues.apache.org/jira/browse/KAFKA-15004
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15019) Improve handling of broker heartbeat timeouts

2023-06-02 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728783#comment-17728783
 ] 

Mickael Maison commented on KAFKA-15019:


Since the PR is merged I'm marking this as resolved in 3.5.0

If there is follow up work left to do, feel free to reopen (and adjust Fix 
Version/s) or create a new ticket.

> Improve handling of broker heartbeat timeouts
> -
>
> Key: KAFKA-15019
> URL: https://issues.apache.org/jira/browse/KAFKA-15019
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> Improve handling of overload situations in the KRaft controller



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15004) Topic config changes are not synced during zk to kraft migration (dual-write)

2023-06-02 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728782#comment-17728782
 ] 

Mickael Maison commented on KAFKA-15004:


Since the PR is merged I'm marking this as resolved in 3.5.0

If there is follow up work left to do, feel free to reopen (and adjust Fix 
Version/s) or create a new ticket.

> Topic config changes are not synced during zk to kraft migration (dual-write)
> -
>
> Key: KAFKA-15004
> URL: https://issues.apache.org/jira/browse/KAFKA-15004
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15003) TopicIdReplicaAssignment is not updated in migration (dual-write) when partitions are changed for topic

2023-06-02 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728780#comment-17728780
 ] 

Mickael Maison commented on KAFKA-15003:


Since the PR is merged I'm marking this as resolved in 3.5.0

If there is follow up work left to do, feel free to reopen (and adjust Fix 
Version/s) or create a new ticket.

> TopicIdReplicaAssignment is not updated in migration (dual-write) when 
> partitions are changed for topic
> ---
>
> Key: KAFKA-15003
> URL: https://issues.apache.org/jira/browse/KAFKA-15003
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14996) The KRaft controller should properly handle overly large user operations

2023-06-02 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14996.

Fix Version/s: 3.5.0
 Assignee: Colin McCabe  (was: Edoardo Comar)
   Resolution: Fixed

> The KRaft controller should properly handle overly large user operations
> 
>
> Key: KAFKA-14996
> URL: https://issues.apache.org/jira/browse/KAFKA-14996
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.5.0
>Reporter: Edoardo Comar
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.5.0
>
>
> If an attempt is made to create a topic with
> num partitions >= QuorumController.MAX_RECORDS_PER_BATCH  (1)
> the client receives an UnknownServerException - it could rather receive a 
> better error.
> The controller logs
> {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed 
> with unknown server exception IllegalStateException at epoch 2 in 21956 us.  
> Renouncing leadership and reverting to the last committed offset 174. 
> (org.apache.kafka.controller.QuorumController)}}
> {{java.lang.IllegalStateException: Attempted to atomically commit 10001 
> records, but maxRecordsPerBatch is 1}}
> {{    at 
> org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}}
> {{    at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}}
> {{    at java.base/java.lang.Thread.run(Thread.java:829)}}
> {{[}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14996) The KRaft controller should properly handle overly large user operations

2023-06-02 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728779#comment-17728779
 ] 

Mickael Maison commented on KAFKA-14996:


Since the PR is merged I'm marking this as resolved in 3.5.0

If there is follow up work left to do, feel free to reopen (and adjust Fix 
Version/s) or create a new ticket.

> The KRaft controller should properly handle overly large user operations
> 
>
> Key: KAFKA-14996
> URL: https://issues.apache.org/jira/browse/KAFKA-14996
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.5.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Blocker
>
> If an attempt is made to create a topic with
> num partitions >= QuorumController.MAX_RECORDS_PER_BATCH  (1)
> the client receives an UnknownServerException - it could rather receive a 
> better error.
> The controller logs
> {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed 
> with unknown server exception IllegalStateException at epoch 2 in 21956 us.  
> Renouncing leadership and reverting to the last committed offset 174. 
> (org.apache.kafka.controller.QuorumController)}}
> {{java.lang.IllegalStateException: Attempted to atomically commit 10001 
> records, but maxRecordsPerBatch is 1}}
> {{    at 
> org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}}
> {{    at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}}
> {{    at java.base/java.lang.Thread.run(Thread.java:829)}}
> {{[}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15010) KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new controller while in dual write mode.

2023-06-02 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728773#comment-17728773
 ] 

Mickael Maison commented on KAFKA-15010:


Since the PR is merged I'm marking this as resolved.

If there is follow up work left to do, feel free to reopen (and adjust Fix 
Version/s) or create a new ticket.

> KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new 
> controller while in dual write mode.
> -
>
> Key: KAFKA-15010
> URL: https://issues.apache.org/jira/browse/KAFKA-15010
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.5.0
>
>
> When a KRaft controller fails over, the existing migration driver (in dual 
> write mode) can fail in between Zookeeper writes and may leave Zookeeper with 
> incomplete and inconsistent data. So when a new controller becomes active 
> (and by extension new migration driver becomes active), this first thing we 
> should do is load the in-memory snapshot and use it to write metadata to 
> Zookeeper to have a steady state. We currently do not do this and it may 
> leave Zookeeper in inconsistent state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15010) KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new controller while in dual write mode.

2023-06-02 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15010.

Resolution: Fixed

> KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new 
> controller while in dual write mode.
> -
>
> Key: KAFKA-15010
> URL: https://issues.apache.org/jira/browse/KAFKA-15010
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.5.0
>
>
> When a KRaft controller fails over, the existing migration driver (in dual 
> write mode) can fail in between Zookeeper writes and may leave Zookeeper with 
> incomplete and inconsistent data. So when a new controller becomes active 
> (and by extension new migration driver becomes active), this first thing we 
> should do is load the in-memory snapshot and use it to write metadata to 
> Zookeeper to have a steady state. We currently do not do this and it may 
> leave Zookeeper in inconsistent state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-06-02 Thread via GitHub


C0urante commented on PR #13771:
URL: https://github.com/apache/kafka/pull/13771#issuecomment-1573852526

   Thanks Greg! I think the tradeoffs here are acceptable. Can we add some 
testing coverage for `PluginUtils::computeAliases`? Once that's done this 
should be good to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-06-02 Thread via GitHub


C0urante commented on code in PR #13771:
URL: https://github.com/apache/kafka/pull/13771#discussion_r1214470057


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##
@@ -356,25 +357,16 @@ public static String prunedName(PluginDesc plugin) {
  * Verify whether a given plugin's alias matches another alias in a 
collection of plugins.
  *
  * @param alias the plugin descriptor to test for alias matching.
- * @param plugins the collection of plugins to test against.
+ * @param aliases the collection of plugins to test against.
  * @param  the plugin type.
  * @return false if a match was found in the collection, otherwise true.
  */
 public static  boolean isAliasUnique(
 PluginDesc alias,
-Collection> plugins
+Map aliases
 ) {
-boolean matched = false;
-for (PluginDesc plugin : plugins) {
-if (simpleName(alias).equals(simpleName(plugin))
-|| prunedName(alias).equals(prunedName(plugin))) {
-if (matched) {
-return false;
-}
-matched = true;
-}
-}
-return true;
+// TODO: Mark alias collision and disable ambiguous aliases completely.

Review Comment:
   > I don't think that anyone should be depending on these cross-plugin 
ambiguous aliases, because if they were, they would be subject to spurious 
ClassNotFoundExceptions whenever the non-deterministic iteration order caused a 
plugin of the wrong type to be loaded.
   
   Plugin scanning and alias computation is currently deterministic (the 
various `Set>` objects that we use are all instances of the 
`TreeSet` type), so users are not subject to this risk while running a fixed 
version of Kafka Connect with a fixed plugin path (and fixed contents of that 
plugin path).
   
   That said, I think the changes you've made are sufficient.
   
   > An alternative is to separate these aliases into per-plugin-type 
namespaces, but I don't think that is necessary at this time.
   
   Yeah, this would be nice to have but it's not a blocker. One case where it 
could come in handy is with plugins that implement multiple plugin interfaces 
(which is actually fairly common with `Converter` and `HeaderConverter` 
instances); you may have multiple `JsonConverter` plugins on your worker, but 
if only one of them implements the `Converter` interface and another implements 
the `Converter` and `HeaderConverter` interface, it's hard to argue that you 
shouldn't be able to use an alias when using the latter as a header converter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573784468

   @vamossagar12 Thanks for the PR. I'd like to wait until Yash has had a 
chance to review before taking a look, so I've removed myself as a reviewer. 
Feel free to add me back once he (or someone else familiar with Connect) has 
had a chance to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-02 Thread via GitHub


jsancio commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1573750489

   > Actually, one thing I was wondering about this change since I am not that 
familiar with what the metadata version gates - does the change in the PR allow 
leader epochs to go backwards?
   
   The important observation is that this PR doesn't change the semantic of 
replaying `PartitionRecord` and `PartitionChangeRecord` with with respect to 
leader epoch bump. When replaying `PartitionChangeRecord` the state machines 
(controller and broker) will increase the leader epoch if the field `Leader` is 
set. This holds true before and after this PR. What this PR changes is when the 
controller sets the `Leader` field in `PartitionChangeRecord`.
   
   I should also point out that the MV check is not require for correctness. It 
is there for performance so that the PRODUCER request doesn't timeout and so 
that the Kafka producer doesn't have to retry the PRODUCE message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-02 Thread via GitHub


erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1214314299


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private final boolean autoCommitEnabled;
 private final int autoCommitIntervalMs;
 private final ConsumerInterceptors interceptors;
+// package private for testing
+final AtomicInteger inFlightAsyncCommits;
 private final AtomicInteger pendingAsyncCommits;

Review Comment:
   @dajac I took the description of pendingAyncCommits from your comment 
https://github.com/apache/kafka/pull/13678#discussion_r1191927989, I hope I did 
that correctly.
   
   Changed in: 
https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-02 Thread via GitHub


erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1214314299


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private final boolean autoCommitEnabled;
 private final int autoCommitIntervalMs;
 private final ConsumerInterceptors interceptors;
+// package private for testing
+final AtomicInteger inFlightAsyncCommits;
 private final AtomicInteger pendingAsyncCommits;

Review Comment:
   I took the description of pendingAyncCommits from your comment 
https://github.com/apache/kafka/pull/13678#discussion_r1191927989, I hope I did 
that correctly.
   
   Changed in: 
https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14497) LastStableOffset is advanced prematurely when a log is reopened.

2023-06-02 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728723#comment-17728723
 ] 

Daniel Urban commented on KAFKA-14497:
--

AFAIU, the information about the replicated state of a transaction is not 
stored in the snapshot at all. I think the data stored in the snapshot file 
needs to be extended with the extra information whether the completed 
transaction is replicated.

By the time ProducerStateManager#completeTxn is called (which puts the 
transaction into ProducerStateManager.unreplicatedTxns), the producer entry is 
already cleared (ProducerAppendInfo#appendEndTxnMarker - currentTxnFirstOffset 
is empty, indicating that there is no pending transaction). If a snapshot is 
created at this point, and then the snapshot is loaded, there is no way to 
differentiate between replicated and unreplicated transactions.

Instead, ProducerAppendInfo#appendEndTxnMarker should also set a flag showing 
that while the transaction is complete, it might still be unreplicated. Then, 
when ProducerStateManager#removeUnreplicatedTransactions is called, the flag in 
the producer entry can be cleared.

This way the snapshot would contain the full data, and we could also recover 
the state of unreplicatedTxns.

[~hachikuji] wdyt about this approach? If it seems okay, I can take a look into 
this and submit a PR.

> LastStableOffset is advanced prematurely when a log is reopened.
> 
>
> Key: KAFKA-14497
> URL: https://issues.apache.org/jira/browse/KAFKA-14497
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vincent Jiang
>Priority: Major
>
> In below test case, last stable offset of log is advanced prematurely after 
> reopen:
>  # producer #1 appends transaction records to leader. offsets = [0, 1, 2, 3]
>  # producer #2 appends transactional records to leader. offsets =  [4, 5, 6, 
> 7]
>  # all records are replicated to followers and high watermark advanced to 8.
>  # at this point, lastStableOffset = 0. (first offset of an open transaction)
>  # producer #1 aborts the transaction by writing an abort marker at offset 8. 
>  ProducerStateManager.unreplicatedTxns contains the aborted transaction 
> (firstOffset=0, lastOffset=8)
>  # then the log is closed and reopened.
>  # after reopen, log.lastStableOffset is initialized to 4.  This is because 
> ProducerStateManager.unreplicatedTxns is empty after reopening log.
>  
> We should rebuild ProducerStateManager.unreplicatedTxns when reloading a log, 
> so that lastStableOffset remains unchanged before and after reopen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573587479

   > @vamossagar12 looks like there are checkstyle failures
   
   Oh didn’t check that before taggging. Will check 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1214258128


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,30 +280,47 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
 if (secondaryStore != null) {
 if (primaryWriteError != null) {
 log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
+try (LoggingContext context = loggingContext()) {
+callback.onCompletion(primaryWriteError, ignored);
+}
 } else {
 try {
 // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
-// backing store.
+// backing store. The only exception to this is when a 
batch consisting of tombstone records fails
+// to be written to secondary store and has been 
successfully written to the primary store. In this case
+// an error would be propagated back as in such cases, 
a deleted source partition
+// would be reported as present because the 2 stores 
are not in sync.
 secondaryStore.set(values, (secondaryWriteError, 
ignored2) -> {
 try (LoggingContext context = loggingContext()) {
-if (secondaryWriteError != null) {
+if (secondaryWriteError != null && 
containsTombstones) {
+log.warn("Failed to write offsets with 
tombstone records to secondary backing store", secondaryWriteError);
+callback.onCompletion(secondaryWriteError, 
ignored);
+return;
+} else if (secondaryWriteError != null) {
 log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
 } else {
 log.debug("Successfully flushed offsets to 
secondary backing store");
 }
+//primaryWriteError is null at this point, and 
we don't care about secondaryWriteError
+callback.onCompletion(null, ignored);

Review Comment:
   Yeah. Good catch. That doesn’t need to happen in this case. Will remove it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1214236252


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,30 +280,47 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
 if (secondaryStore != null) {
 if (primaryWriteError != null) {
 log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
+try (LoggingContext context = loggingContext()) {
+callback.onCompletion(primaryWriteError, ignored);
+}
 } else {
 try {
 // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
-// backing store.
+// backing store. The only exception to this is when a 
batch consisting of tombstone records fails
+// to be written to secondary store and has been 
successfully written to the primary store. In this case
+// an error would be propagated back as in such cases, 
a deleted source partition
+// would be reported as present because the 2 stores 
are not in sync.
 secondaryStore.set(values, (secondaryWriteError, 
ignored2) -> {
 try (LoggingContext context = loggingContext()) {
-if (secondaryWriteError != null) {
+if (secondaryWriteError != null && 
containsTombstones) {
+log.warn("Failed to write offsets with 
tombstone records to secondary backing store", secondaryWriteError);
+callback.onCompletion(secondaryWriteError, 
ignored);
+return;
+} else if (secondaryWriteError != null) {
 log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
 } else {
 log.debug("Successfully flushed offsets to 
secondary backing store");
 }
+//primaryWriteError is null at this point, and 
we don't care about secondaryWriteError
+callback.onCompletion(null, ignored);

Review Comment:
   Isn't this going to cause us to always block on writes to the secondary 
store which is something we want to avoid?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


yashmayya commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573553370

   @vamossagar12 looks like there are checkstyle failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214227660


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
 assertPollMetrics(0);
 }
 
+@Test
+public void testRetriableExceptionInPoll() throws Exception {
+
+final ErrorHandlingMetrics errorHandlingMetrics = 
mock(ErrorHandlingMetrics.class);
+final RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   That's probably due to 
https://github.com/apache/kafka/pull/13726#discussion_r1214152577 (i.e. poll 
being called repeatedly in the task execution loop)? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214227126


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   `total-record-failures` is defined as the number of "record processing 
failures" in a task - 
https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L366-L367.
 I don't think a failed call to `SourceTask::poll` counts as a record 
processing failure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1573519452

   I will review the tests once they run. Still tagging the 2 reviewers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


vamossagar12 opened a new pull request, #13801:
URL: https://github.com/apache/kafka/pull/13801

   Description from the ticket:
   When exactly-once support is enabled for source connectors, source offsets 
can potentially be written to two different offsets topics: a topic specific to 
the connector, and the global offsets topic (which was used for all connectors 
prior to KIP-618 / version 3.3.0).
   
   Precedence is given to offsets in the per-connector offsets topic, but if 
none are found for a given partition, then the global offsets topic is used as 
a fallback.
   
   When committing offsets, a transaction is used to ensure that source records 
and source offsets are written to the Kafka cluster targeted by the source 
connector. This transaction only includes the connector-specific offsets topic. 
Writes to the global offsets topic take place after writes to the 
connector-specific offsets topic have completed successfully, and if they fail, 
a warning message is logged, but no other action is taken.
   
   Normally, this ensures that, for offsets committed by exactly-once-supported 
source connectors, the per-connector offsets topic is at least as up-to-date as 
the global offsets topic, and sometimes even ahead.
   
   However, for tombstone offsets, we lose that guarantee. If a tombstone 
offset is successfully written to the per-connector offsets topic, but cannot 
be written to the global offsets topic, then the global offsets topic will 
still contain that source offset, but the per-connector topic will not. Due to 
the fallback-on-global logic used by the worker, if a task requests offsets for 
one of the tombstoned partitions, the worker will provide it with the offsets 
present in the global offsets topic, instead of indicating to the task that no 
offsets can be found.
   
   This PR explicitly fails the offset flush for cases when the dual write 
fails on secondary store. Note that one of the proposed approaches on the JIRA 
ticket was to explicitly write tombstone records to secondary stores first 
before attempting to write to primary store. This PR doesn't choose to do so 
for 2 reasons =>
   
   1. Tombstone events happening are rare events as per ticket. And the 
probability of them failing on secondary and succeeding on primary is even 
lesser than that.
   2. There is no guarantee that when we explicitly write tombstone records to 
secondary store, the write would succeed. So, we might still end up in the same 
situation as above.
   Considering these things, this PR errs on the side of correctness for these 
rare events.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214203205


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
 assertPollMetrics(0);
 }
 
+@Test
+public void testRetriableExceptionInPoll() throws Exception {
+
+final ErrorHandlingMetrics errorHandlingMetrics = 
mock(ErrorHandlingMetrics.class);
+final RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   I am using the existing mechanisms to run the test using CountDownLatch. I 
could get the enough number of invocations but will think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214199833


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Yeah but I didn't think I really needed to handle that case and the existing 
methods should be taking care of it going by the example above with converters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214198459


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   That's a good point. I assumed it should be taken care because even the 
transformations also don't make any such checks 
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L482)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214197359


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   I think only `total-records-skipped` is a problem which is defined as `Total 
number of records skipped by this task.`.  `total-record-failures` is actually 
defined as the total number of failures seen and not the total number of 
records which have failed. I would assume it should be easy to update the 
number of records in the poll(), why do you say we need to make a number of 
updates?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


vamossagar12 commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214192966


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Oh.. I just went by what the KIP says in the Proposed Approach section. I 
would assume this change would need an amendment of the KIP .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] oliveigah commented on a diff in pull request #13680: MINOR: Add "versions" tag to recently added ReplicaState field on Fetch Request

2023-06-02 Thread via GitHub


oliveigah commented on code in PR #13680:
URL: https://github.com/apache/kafka/pull/13680#discussion_r1214190810


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -61,7 +61,7 @@
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
 { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
-{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", 
"taggedVersions":"15+", "tag": 1, "fields": [

Review Comment:
   Typo fixed! Thanks for the explanation @dajac.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-06-02 Thread via GitHub


dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1214154053


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private final boolean autoCommitEnabled;
 private final int autoCommitIntervalMs;
 private final ConsumerInterceptors interceptors;
+// package private for testing
+final AtomicInteger inFlightAsyncCommits;
 private final AtomicInteger pendingAsyncCommits;

Review Comment:
   Could we add a comment for these two variables? I think that we were all 
confused by them while looking at this PR so adding clarifying comments would 
make sense here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-06-02 Thread via GitHub


yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214147751


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
 assertPollMetrics(0);
 }
 
+@Test
+public void testRetriableExceptionInPoll() throws Exception {
+
+final ErrorHandlingMetrics errorHandlingMetrics = 
mock(ErrorHandlingMetrics.class);
+final RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   Shouldn't we use `MockTime` here so that we can advance the time 
programatically on each call to poll and ensure the retries occur as per the 
deadline?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Looks like this currently only handles 
`org.apache.kafka.connect.errors.RetriableException`, should we also add 
`org.apache.kafka.common.errors.RetriableException`? Looks like there's some 
history here - https://github.com/apache/kafka/pull/6675



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   It looks like the `RetryWithToleranceOperator` currently expects to only be 
used for per-record kind of operations (conversion, transformation etc.) - 
[here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217),
 
[here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233)
 and 
[here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L183)
 for instance. I think we'll need to make a number of updates there before it 
can be used in operations such as `SourceTask::poll` and `SinkTask::put`. 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Should we be retrying on `RetriableException`s even if `error.tolerance` is 
set to `none`? Looks like that's what is happening here?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+

[GitHub] [kafka] dajac commented on a diff in pull request #13680: MINOR: Add "versions" tag to recently added ReplicaState field on Fetch Request

2023-06-02 Thread via GitHub


dajac commented on code in PR #13680:
URL: https://github.com/apache/kafka/pull/13680#discussion_r1214151118


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -61,7 +61,7 @@
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
 { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
-{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", 
"taggedVersions":"15+", "tag": 1, "fields": [

Review Comment:
   While we are here, there is a small type. A space is missing here 
`"taggedVersions": "15+"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces

2023-06-02 Thread via GitHub


dajac commented on code in PR #13794:
URL: https://github.com/apache/kafka/pull/13794#discussion_r1214139956


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The Replayable interface.
+ */
+public interface Replayable {

Review Comment:
   I have renamed it to `CoordinatorPlayback`. Does it sound good to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces

2023-06-02 Thread via GitHub


dajac commented on code in PR #13794:
URL: https://github.com/apache/kafka/pull/13794#discussion_r1214139496


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The Replayable interface.
+ */

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15049) Flaky test DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate

2023-06-02 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728637#comment-17728637
 ] 

Josep Prat commented on KAFKA-15049:


I've seen this error locally after verifying 3.4.1-rc3 (not on previous RCs) 
but only after a test run with some other flaky test failures. I decided 
rebooting the machine and tests passed for me. So I assumed it was a failed 
test that left some dangling process blocking the desired port for this test.

> Flaky test DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate
> 
>
> Key: KAFKA-15049
> URL: https://issues.apache.org/jira/browse/KAFKA-15049
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Priority: Major
>
> While testing 3.4.1RC3 
> DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate failed 
> repeatedly on my machine always with the following stacktrace.
> {{org.opentest4j.AssertionFailedError: Unexpected exception type thrown, 
> expected:  but was: 
> 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at app//org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67)
>  at app//org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
>  at app//org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3083)
>  at 
> app//kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate(DynamicBrokerReconfigurationTest.scala:1066)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15050) Prompts in the quickstarts

2023-06-02 Thread Tom Bentley (Jira)
Tom Bentley created KAFKA-15050:
---

 Summary: Prompts in the quickstarts
 Key: KAFKA-15050
 URL: https://issues.apache.org/jira/browse/KAFKA-15050
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: Tom Bentley


In the quickstarts [Steps 
1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate 
the command prompt. When we start to use Kafka Connect in [Step 
6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch to 
{{{}>{}}}. The [Kafka Streams 
quickstart|https://kafka.apache.org/documentation/streams/quickstart] also uses 
{{{}>{}}}. I don't think there's a reason for this, but if there is one (root 
vs user account?) it should be explained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214114184


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Append the messages to the local replica logs
+   * Append the messages to the local replica logs. 
ReplicaManager#appendRecords should usually be
+   * used instead of this method.

Review Comment:
   I have refactored the implementation to use `appendRecords`. I think that it 
is better like this. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214113530


##
core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.cluster.PartitionListener
+import kafka.server.{ReplicaManager, RequestLocal}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
TimestampType}
+import org.apache.kafka.common.record.Record.EMPTY_HEADERS
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.AppendOrigin
+
+import java.nio.ByteBuffer
+import java.util
+import scala.collection.Map
+
+private[group] class ListenerAdaptor(
+  val listener: PartitionWriter.Listener
+) extends PartitionListener {
+  override def onHighWatermarkUpdated(
+tp: TopicPartition,
+offset: Long
+  ): Unit = {
+listener.onHighWatermarkUpdated(tp, offset)
+  }
+
+  override def equals(that: Any): Boolean = that match {
+case other: ListenerAdaptor => listener.equals(other.listener)
+case _ => false
+  }
+
+  override def hashCode(): Int = {
+listener.hashCode()
+  }
+
+  override def toString: String = {
+s"ListenerAdaptor(listener=$listener)"
+  }
+}
+
+class PartitionWriterImpl[T](
+  replicaManager: ReplicaManager,
+  serializer: PartitionWriter.Serializer[T],
+  compressionType: CompressionType,
+  time: Time
+) extends PartitionWriter[T] {
+
+  override def registerListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def deregisterListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.removeListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def append(
+tp: TopicPartition,
+records: util.List[T]
+  ): Long = {
+if (records.isEmpty) {
+  throw new IllegalStateException("records must be non-empty.")
+}
+
+replicaManager.getLogConfig(tp) match {
+  case Some(logConfig) =>
+val magic = logConfig.recordVersion.value
+val maxBatchSize = logConfig.maxMessageSize
+val currentTimeMs = time.milliseconds()
+
+val recordsBuilder = MemoryRecords.builder(
+  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+  magic,
+  compressionType,
+  TimestampType.CREATE_TIME,
+  0L,
+  maxBatchSize
+)
+
+records.forEach { record =>
+  val keyBytes = serializer.serializeKey(record)
+  val valueBytes = serializer.serializeValue(record)
+
+  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) {
+recordsBuilder.append(
+  currentTimeMs,
+  keyBytes,
+  valueBytes,
+  EMPTY_HEADERS
+)
+  } else {
+throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
+  s"in append to partition $tp which exceeds the maximum 
configured size of $maxBatchSize.")
+  }
+}
+
+val appendResults = replicaManager.appendToLocalLog(

Review Comment:
   Let me rename the class to `CoordinatorPartitionWriter` to make it explicit 
that this is used for/by coordinators. Does this work for you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-02 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1214109717


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+
+/**
+ * Serializer which serializes {{@link Record}} to bytes.
+ */
+public class RecordSerializer implements PartitionWriter.Serializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(

Review Comment:
   Yeah. I made it generic in the case we would want to reuse it for another 
coordinator in the future. This one is tight to the Record class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-02 Thread via GitHub


urbandan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1573381258

   @philipnee @ijuma @hachikuji @jolshan Could you please review this fix? 
Based on the git history, you made changes lately in the related code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >