[GitHub] [kafka] dengziming commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-06-06 Thread via GitHub


dengziming commented on code in PR #13679:
URL: https://github.com/apache/kafka/pull/13679#discussion_r1220994605


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -112,4 +157,8 @@ class DefaultApiVersionManager(
   zkMigrationEnabled
 )
   }
+
+  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse 
= {
+throw new UnsupportedOperationException("This method is not supported in 
DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")

Review Comment:
   The `features` in `ApiVersionResponse` is retrieved directly from 
`MetadataCache` in BrokerServer/KafkaServer, but in ControllerServer it can 
only be got asynchronized and can't be unified, so I added 2 different methods 
here.
I'm still checking whether there are better way to handle this, one way is 
to add a synchronized method to controller, another is to make 
`ApiVersionManager.apiVersionResponse` asynchronized, both will introduce new 
problems, the root cause is that `ApiVersionRequest` is treated static but 
finalized features is changing dynamically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-06 Thread via GitHub


urbandan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1580030486

   @viktorsomogyi Since this part of the code is quite tricky, I would try to 
address the different issues in different PRs. I believe that the fix I'm 
proposing will solve the issue reported in KAFKA-14034 specifically. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0

2023-06-06 Thread via GitHub


showuon commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1580008259

   @bmscomp , could you respond to @divijvaidya 's comments above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15064) Use KafkaTemplate to send message with below exception - IllegalMonitorStateException

2023-06-06 Thread Xuguang zhan (Jira)
Xuguang zhan created KAFKA-15064:


 Summary: Use KafkaTemplate to send message with below exception - 
IllegalMonitorStateException
 Key: KAFKA-15064
 URL: https://issues.apache.org/jira/browse/KAFKA-15064
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.1.2
Reporter: Xuguang zhan


*Running env:*
1.openjdk-17.0.2-5.el7.x86_64
2.Spring-kafka :2.8.11
3. Kafka client: 3.1.2



Special case would be: one Tomcat have three web applications or we say context 
, Kafka client put into tomcat share lib.

 

*Java Stack:*

java.lang.IllegalMonitorStateException: current thread is not owner
at java.lang.Object.wait(Native Method) ~[?:?]
at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55) 
~[kafka-clients-3.1.2.jar:?]
at 
org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
 ~[kafka-clients-3.1.2.jar:?]
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1088)
 ~[kafka-clients-3.1.2.jar:?]
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:935) 
~[kafka-clients-3.1.2.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) 
~[kafka-clients-3.1.2.jar:?]
at 
org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087)
 ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) 
~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429) 
~[spring-kafka-2.8.11.jar:2.8.11]

 

 

Below is low level code stack :

 
{code:java}
//  @Override
public void waitObject(Object obj, Supplier condition, long 
deadlineMs) throws InterruptedException {
synchronized (obj) {
while (true) {
if (condition.get())
return;

long currentTimeMs = milliseconds();
if (currentTimeMs >= deadlineMs)
throw new TimeoutException("Condition not satisfied before 
deadline");

obj.wait(deadlineMs - currentTimeMs);
}
}
} {code}
 

 
{code:java}
// code placeholder
/**
 * Wait for metadata update until the current version is larger than the 
last version we know of
 */
public synchronized void awaitUpdate(final int lastVersion, final long 
timeoutMs) throws InterruptedException {
long currentTimeMs = time.milliseconds();
long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : 
currentTimeMs + timeoutMs;
time.waitObject(this, () -> {
// Throw fatal exceptions, if there are any. Recoverable topic 
errors will be handled by the caller.
maybeThrowFatalException();
return updateVersion() > lastVersion || isClosed();
}, deadlineMs);

if (isClosed())
throw new KafkaException("Requested metadata update after close");
}



{code}
 

I checked same issue check the jira which have been reported 
https://issues.apache.org/jira/browse/KAFKA-10902



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Vaibhav-Nazare commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage

2023-06-06 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1579948195

   @dajac Can you check the please check the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-06-06 Thread via GitHub


sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1579930237

   Hi @C0urante, Did you get a chance to take a look at it? TIA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15062) Power(ppc64le) support for Kafka

2023-06-06 Thread Vaibhav (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vaibhav updated KAFKA-15062:

Reviewer: David Jacot

> Power(ppc64le) support for Kafka
> 
>
> Key: KAFKA-15062
> URL: https://issues.apache.org/jira/browse/KAFKA-15062
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Vaibhav
>Priority: Major
>
> Support for Power architecture (ppc64le) for apache kafka.
> What is IBM Power architecture?
> It is a RISC architecture and IBM has recently made its ISA (Instruction Set 
> Architecture) opensource and in doing so, they have significantly contributed 
> back to the opensource community at large. Many of the pioneers of banking 
> and HPC industries today run on ppc64le architecture.
> As an ongoing effort to enable open-source projects where Power architecture 
> can add value, we are trying to enable kafka on Power.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core

2023-06-06 Thread Scott Rowley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729848#comment-17729848
 ] 

Scott Rowley edited comment on KAFKA-15000 at 6/7/23 3:41 AM:
--

[~showuon] Thank you for your time on this.  The vulnerability description is:

_com.fasterxml.jackson.core_jackson-core package versions before 2.15.0 are 
vulnerable to Denial of Service (DoS). The package does not properly restrict 
the size or amount of resources that are requested or influenced by an actor, 
which can be used to consume more resources than intended and leads to 
Uncontrolled Resource Consumption (\'Resource Exhaustion\')._

Severity: High, CVSS 7.5

 

For some background for others, my understanding is the "PRISMA" identifier 
comes from a proprietary vulnerability database from Twistlock, now owned by 
Palo Alto's PRISMA scanner.  In my observations, they tend to flag items where 
a "security related" merge has been made in a github project as a mechanism for 
their customers to trigger version upgrades.  This makes it hard for downstream 
projects such as Kafka to keep up, as there often isn't a public reference to 
assess risk or otherwise action.  As an example, -there's no linked Jackson 
github request- I see, so it is not clear whether this may have also been 
addressed on the latest minor version of jackson 2.14.3 which is after 2.15.0 
was released. Edit: Linked PR is 
[https://github.com/FasterXML/jackson-core/pull/827] 

I've been lurking for a while, but i'm not sure i've come across any dependency 
upgrade strategy or policy for Kafka (e.g. when to do minor version updates, 
when to do major).  From looking at the Jackson github and wiki, which some of 
the lifecycle information seems out of date, the 2.15 and 2.14 versions are 
actively in release mode.  2.13 may still be open for selective fixes but 
appears to be next on the list to end of life.  So independent of any 
vulnerability, getting Kafka off 2.13 is likely a good medium-term activity.  
The PR [https://github.com/apache/kafka/pull/13662] seems to be making progress 
on this, though with some technical hurdles still to overcome.

Edit: Added link to reported vulnerable PR by PRISMA.  
[https://github.com/FasterXML/jackson-core/pull/827/files]  While it seems some 
may have gotten into 2.14 
([https://github.com/FasterXML/jackson-core/pull/1013)] it seems like not 
everything did.


was (Author: JIRAUSER300756):
[~showuon] Thank you for your time on this.  The vulnerability description is:

_com.fasterxml.jackson.core_jackson-core package versions before 2.15.0 are 
vulnerable to Denial of Service (DoS). The package does not properly restrict 
the size or amount of resources that are requested or influenced by an actor, 
which can be used to consume more resources than intended and leads to 
Uncontrolled Resource Consumption (\'Resource Exhaustion\')._

Severity: High, CVSS 7.5

 

For some background for others, my understanding is the "PRISMA" identifier 
comes from a proprietary vulnerability database from Twistlock, now owned by 
Palo Alto's PRISMA scanner.  In my observations, they tend to flag items where 
a "security related" merge has been made in a github project as a mechanism for 
their customers to trigger version upgrades.  This makes it hard for downstream 
projects such as Kafka to keep up, as there often isn't a public reference to 
assess risk or otherwise action.  As an example, there's no linked Jackson 
github request I see, so it is not clear whether this may have also been 
addressed on the latest minor version of jackson 2.14.3 which is after 2.15.0 
was released.

I've been lurking for a while, but i'm not sure i've come across any dependency 
upgrade strategy or policy for Kafka (e.g. when to do minor version updates, 
when to do major).  From looking at the Jackson github and wiki, which some of 
the lifecycle information seems out of date, the 2.15 and 2.14 versions are 
actively in release mode.  2.13 may still be open for selective fixes but 
appears to be next on the list to end of life.  So independent of any 
vulnerability, getting Kafka off 2.13 is likely a good medium-term activity.  
The PR [https://github.com/apache/kafka/pull/13662] seems to be making progress 
on this, though with some technical hurdles still to overcome.

> High vulnerability PRISMA-2023-0067 reported in jackson-core
> 
>
> Key: KAFKA-15000
> URL: https://issues.apache.org/jira/browse/KAFKA-15000
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Arushi Rai
>Priority: Critical
>
> Kafka is using jackson-core version 2.13.4 which has high vulnerability 
> reported [PRISMA-2023-0067. 
> |https://github.com/FasterXML/jackson-core/pull/827]
> This

[GitHub] [kafka] ijuma commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-06-06 Thread via GitHub


ijuma commented on code in PR #13679:
URL: https://github.com/apache/kafka/pull/13679#discussion_r1220723779


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -112,4 +157,8 @@ class DefaultApiVersionManager(
   zkMigrationEnabled
 )
   }
+
+  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse 
= {
+throw new UnsupportedOperationException("This method is not supported in 
DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")

Review Comment:
   What's the thinking around this? It's extremely brittle to design interfaces 
like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon closed pull request #13822: MINOR: update system test for 3.4.1

2023-06-06 Thread via GitHub


showuon closed pull request #13822: MINOR: update system test for 3.4.1
URL: https://github.com/apache/kafka/pull/13822


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13822: MINOR: update system test for 3.4.1

2023-06-06 Thread via GitHub


showuon opened a new pull request, #13822:
URL: https://github.com/apache/kafka/pull/13822

   update system test for 3.4.1
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode

2023-06-06 Thread via GitHub


showuon commented on PR #13807:
URL: https://github.com/apache/kafka/pull/13807#issuecomment-1579729897

   Retriggered: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13807/4/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode

2023-06-06 Thread via GitHub


satishd commented on PR #13807:
URL: https://github.com/apache/kafka/pull/13807#issuecomment-1579719397

   @showuon can you retrigger the build?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-06-06 Thread via GitHub


hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1220625126


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -706,95 +662,72 @@ public void testSourceTaskIgnoresProducerException() 
throws Exception {
 // and no ConnectException will be thrown
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-expectOffsetFlush(true);
-expectSendRecordOnce();
-expectSendRecordProducerCallbackFail();
-sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.isNull());
 
-//As of KAFKA-14079 all offsets should be committed, even for failed 
records (if ignored)
-//Only the last offset will be passed to the method as everything up 
to that point is committed
-//Before KAFKA-14079 offset 12 would have been passed and not 13 as it 
would have been unacked
-offsetWriter.offset(PARTITION, offset2);
-PowerMock.expectLastCall();
+expectOffsetFlush();
+expectPreliminaryCalls();
 
-PowerMock.replayAll();
+when(producer.send(any(ProducerRecord.class), any(Callback.class)))
+.thenAnswer(producerSendAnswer(true))
+.thenAnswer(producerSendAnswer(false));
 
 //Send records and then commit offsets and verify both were committed 
and no exception
-Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
-Whitebox.invokeMethod(workerTask, "sendRecords");
-Whitebox.invokeMethod(workerTask, "updateCommittableOffsets");
+workerTask.toSend = Arrays.asList(record1, record2);
+workerTask.sendRecords();
+workerTask.updateCommittableOffsets();
 workerTask.commitOffsets();
 
-PowerMock.verifyAll();
+//As of KAFKA-14079 all offsets should be committed, even for failed 
records (if ignored)
+//Only the last offset will be passed to the method as everything up 
to that point is committed
+//Before KAFKA-14079 offset 12 would have been passed and not 13 as it 
would have been unacked
+verify(offsetWriter).offset(PARTITION, offset2);
+verify(sourceTask).commitRecord(any(SourceRecord.class), isNull());
 
 //Double check to make sure all submitted records were cleared
-assertEquals(0, ((SubmittedRecords) 
Whitebox.getInternalState(workerTask,
-"submittedRecords")).records.size());
+assertEquals(0, workerTask.submittedRecords.records.size());
 }
 
 @Test
 public void testSlowTaskStart() throws Exception {
 final CountDownLatch startupLatch = new CountDownLatch(1);
 final CountDownLatch finishStartupLatch = new CountDownLatch(1);
-
 createWorkerTask();
 
-offsetStore.start();
-EasyMock.expectLastCall();
-sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-EasyMock.expectLastCall();
-sourceTask.start(TASK_PROPS);
-EasyMock.expectLastCall().andAnswer(() -> {
+doAnswer((Answer) invocation -> {
 startupLatch.countDown();
-assertTrue(awaitLatch(finishStartupLatch));
+ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting 
for task to stop");

Review Comment:
   Hmm... not really. I mean, the countDownLatch is updated after calling 
`workerTask.stop()` on L710, but this call only sets an internal flag on the 
task and doesn't really block, so technically no.
   
   I fail to understand the value of this synchronization, so decided to remove 
the `finishStartupLatch` altogether



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1220560961


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1040 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalArgume

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1220554821


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalA

[GitHub] [kafka] jolshan commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13811:
URL: https://github.com/apache/kafka/pull/13811#discussion_r1220507827


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1376,7 +1376,7 @@ public void handleResponse(AbstractResponse response) {
 fatalError(error.exception());
 return;
 } else if (error == Errors.INVALID_TXN_STATE) {
-fatalError(new KafkaException(error.exception()));

Review Comment:
   As mentioned, in the other instance, this probably won't be returned to the 
client, but in the case that it was -- could there be a client that expects the 
wrapped error? At this point, it's probably not an issue, but just curious the 
implication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13811:
URL: https://github.com/apache/kafka/pull/13811#discussion_r1220504060


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1599,6 +1599,8 @@ public void handleResponse(AbstractResponse response) {
 fatalError(Errors.PRODUCER_FENCED.exception());
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
 fatalError(error.exception());
+} else if (error == Errors.INVALID_TXN_STATE) {

Review Comment:
   I went down a very long rabbit hole to find that we only returned 
INVALID_TXN_STATE for a short time from this request.

https://github.com/apache/kafka/commit/1f2451d4e7e3766540d3650d177e304fcddf49b8 
(here's the commit that removed this error). 
   
   We never removed it from the errors returned I suppose because some really 
old broker could return it. 
   
   Having this here doesn't hurt, I was just curious about the history behind 
it. 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13811:
URL: https://github.com/apache/kafka/pull/13811#discussion_r1220504060


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1599,6 +1599,8 @@ public void handleResponse(AbstractResponse response) {
 fatalError(Errors.PRODUCER_FENCED.exception());
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
 fatalError(error.exception());
+} else if (error == Errors.INVALID_TXN_STATE) {

Review Comment:
   I went down a very long rabbit hole to find that we only returned 
INVALID_TXN_STATE for a short time from this request: 
https://github.com/apache/kafka/commit/1f2451d4e7e3766540d3650d177e304fcddf49b8 
(here's the commit that removed this error). We never removed it from the 
errors returned I suppose because some really old broker could return it. 
   
   Having this here doesn't hurt, I was just curious about the history behind 
it. 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core

2023-06-06 Thread Scott Rowley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729848#comment-17729848
 ] 

Scott Rowley commented on KAFKA-15000:
--

[~showuon] Thank you for your time on this.  The vulnerability description is:

_com.fasterxml.jackson.core_jackson-core package versions before 2.15.0 are 
vulnerable to Denial of Service (DoS). The package does not properly restrict 
the size or amount of resources that are requested or influenced by an actor, 
which can be used to consume more resources than intended and leads to 
Uncontrolled Resource Consumption (\'Resource Exhaustion\')._

Severity: High, CVSS 7.5

 

For some background for others, my understanding is the "PRISMA" identifier 
comes from a proprietary vulnerability database from Twistlock, now owned by 
Palo Alto's PRISMA scanner.  In my observations, they tend to flag items where 
a "security related" merge has been made in a github project as a mechanism for 
their customers to trigger version upgrades.  This makes it hard for downstream 
projects such as Kafka to keep up, as there often isn't a public reference to 
assess risk or otherwise action.  As an example, there's no linked Jackson 
github request I see, so it is not clear whether this may have also been 
addressed on the latest minor version of jackson 2.14.3 which is after 2.15.0 
was released.

I've been lurking for a while, but i'm not sure i've come across any dependency 
upgrade strategy or policy for Kafka (e.g. when to do minor version updates, 
when to do major).  From looking at the Jackson github and wiki, which some of 
the lifecycle information seems out of date, the 2.15 and 2.14 versions are 
actively in release mode.  2.13 may still be open for selective fixes but 
appears to be next on the list to end of life.  So independent of any 
vulnerability, getting Kafka off 2.13 is likely a good medium-term activity.  
The PR [https://github.com/apache/kafka/pull/13662] seems to be making progress 
on this, though with some technical hurdles still to overcome.

> High vulnerability PRISMA-2023-0067 reported in jackson-core
> 
>
> Key: KAFKA-15000
> URL: https://issues.apache.org/jira/browse/KAFKA-15000
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Arushi Rai
>Priority: Critical
>
> Kafka is using jackson-core version 2.13.4 which has high vulnerability 
> reported [PRISMA-2023-0067. 
> |https://github.com/FasterXML/jackson-core/pull/827]
> This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to 
> the same. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-06 Thread via GitHub


C0urante commented on PR #13819:
URL: https://github.com/apache/kafka/pull/13819#issuecomment-1579470948

   @mimaison @viktorsomogyi if either of you has a moment, would you mind 
taking a look? This change is deceptively small but I've tried to highlight in 
the description both why it's valuable for testing and non-testing 
environments, and why it's safe. Happy to discuss further if it helps!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-06 Thread via GitHub


C0urante commented on PR #13819:
URL: https://github.com/apache/kafka/pull/13819#issuecomment-1579468092

   The next run also had [no failing integration 
tests](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13819/2/tests)
 for MirrorMaker 2.
   
   Marking ready for review, but kicking off another build just to play it safe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1220268552


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalA

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1220268287


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalA

[GitHub] [kafka] jolshan commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1220267344


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorStateMachineSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalA

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13816: MINOR: Optimize runtime of MM2 integration tests by batching transactions

2023-06-06 Thread via GitHub


gharris1727 commented on code in PR #13816:
URL: https://github.com/apache/kafka/pull/13816#discussion_r1220234170


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java:
##
@@ -34,14 +38,39 @@ static Map makeProps(String... keyValues) {
 }
 return props;
 }
-
-/*
- * return records with different but predictable key and value 
+
+/**
+ * Assemble a collection of records arbitrarily distributed across all 
partitions of the specified topic
+ * @param topicName Destination topic
+ * @param numRecords count of records to produce to the topic in total
+ * @return A batch of records that can be sent to a producer.
  */
-public static Map generateRecords(int numRecords) {
-Map records = new HashMap<>();
+public static List> generateRecords(String 
topicName, int numRecords) {
+List> records = new ArrayList<>();
 for (int i = 0; i < numRecords; i++) {
-records.put("key-" + i, "message-" + i);
+String key = "key-" + i;
+String value = "message-" + i;
+records.add(new ProducerRecord<>(topicName, null, key.getBytes(), 
value.getBytes()));
+}
+return records;
+}
+
+/**
+ * Assemble a collection of records evenly distributed across some 
partitions of the specified topic

Review Comment:
   Since the tests don't rely on this keying, and having the same key in 
multiple partitions is strange, i'll change this to make the keys unique.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-06 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1220212149


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod;
+minTimestamp = Long.MAX_VALUE;
+numRec = 0;
+bufferSize = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue = null;
+
+if (predicate.get()) {
+final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - 
gracePeriod.toMillis());
+if (iterator.hasNext()) {
+keyValue = iterator.next();
+}
+if (keyValue == null) {
+if (numRecords() == 0) {
+minTimestamp = Long.MAX_VALUE;
+}
+return;
+}
+BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+
+Change value = valueSerde.deserializeParts(
+topic,
+new Change<>(bufferValue.newValue(), bufferValue.oldValue())
+);
+while (keyValue != null && predicate.get() && 
wrapped().observedStreamTime - gracePeriod.toMillis() >= minTimestamp()) {
+if (bufferValue.context().timestamp() != minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match the 
actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-06 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1220161797


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private final String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod);
+try {

Review Comment:
   I always forget that exists. Good call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-06 Thread via GitHub


wcarlson5 commented on PR #13756:
URL: https://github.com/apache/kafka/pull/13756#issuecomment-1579290388

   @vcrfxia Once you are good with this can you take a look 
here[https://github.com/wcarlson5/kafka/pull/1]? Its the second part and has 
the new joining logic in it. After that I should add the recovery logic a third 
PR 
   
   I have it targeted to this feature branch until this gets merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-06 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1220108194


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod;
+minTimestamp = Long.MAX_VALUE;
+numRec = 0;
+bufferSize = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue = null;
+
+if (predicate.get()) {
+final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - 
gracePeriod.toMillis());
+if (iterator.hasNext()) {
+keyValue = iterator.next();
+}
+if (keyValue == null) {
+if (numRecords() == 0) {
+minTimestamp = Long.MAX_VALUE;
+}
+return;
+}
+BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+
+Change value = valueSerde.deserializeParts(
+topic,
+new Change<>(bufferValue.newValue(), bufferValue.oldValue())
+);
+while (keyValue != null && predicate.get() && 
wrapped().observedStreamTime - gracePeriod.toMillis() >= minTimestamp()) {
+if (bufferValue.context().timestamp() != minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match the 
actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+ 

[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-06 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1220078561


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = {
-leaderLogIfLocal.exists(leaderLog => 
leaderLog.hasOngoingTransaction(producerId))
+  // Returns a verificationGuard object if we need to verify. This starts or 
continues the verification process. Otherwise return null.

Review Comment:
   I can swap all these to not use this term. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13821: MINOR: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-06 Thread via GitHub


gharris1727 opened a new pull request, #13821:
URL: https://github.com/apache/kafka/pull/13821

   In order to support multiple scanning modes, we should refactor the existing 
scanning mechanism out of the DelegatingClassLoader. This is because KIP-898 
will require more functionality that relies on the results of scanning, and it 
is not appropriate to add to the DCL itself.
   
   Scanning (and the PluginScanResult) are dependent on the ClassLoader 
instances which are used to load the plugins, so the DelegatingClassLoader is 
still responsible for processing the plugin path and instantiating the 
PluginClassLoader, and emits these to the external scanner via PluginSource 
objects.
   
   In addition to pulling the existing reflection-based scanning out into a 
ReflectionScanner and superclass PluginScanner, add the ServiceLoaderScanner 
(currently unused) which shares some functionality with the ReflectionScanner.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12290: MINOR: Stop leaking threads in BlockingConnectorTest

2023-06-06 Thread via GitHub


C0urante commented on PR #12290:
URL: https://github.com/apache/kafka/pull/12290#issuecomment-1579174340

   @tombentley @viktorsomogyi if you have a moment, would you mind taking a 
look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12444: KAFKA-14101: Improve documentation for consuming from embedded Kafka cluster topics in Connect integration testing framework

2023-06-06 Thread via GitHub


C0urante commented on PR #12444:
URL: https://github.com/apache/kafka/pull/12444#issuecomment-1579167201

   Downgrading to a draft until I can revisit and fix the merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2023-06-06 Thread via GitHub


C0urante commented on PR #12307:
URL: https://github.com/apache/kafka/pull/12307#issuecomment-1579164297

   @mimaison @showuon if you have a moment, would you mind taking a look? 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11608: KAFKA-13533: Clean up resources on failed connector and task startup

2023-06-06 Thread via GitHub


C0urante commented on PR #11608:
URL: https://github.com/apache/kafka/pull/11608#issuecomment-1579162986

   Downgrading to a draft until I can revisit and fix the merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11369: KAFKA-13327, KAFKA-13328, KAFKA-13329: Clean up preflight connector validation

2023-06-06 Thread via GitHub


C0urante commented on PR #11369:
URL: https://github.com/apache/kafka/pull/11369#issuecomment-1579162605

   Downgrading to a draft until I can revisit and fix the merge conflicts. I 
may also split this into several PRs to make review easier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante closed pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect

2023-06-06 Thread via GitHub


C0urante closed pull request #11986: KAFKA-7509: Clean up incorrect warnings 
logged by Connect
URL: https://github.com/apache/kafka/pull/11986


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect

2023-06-06 Thread via GitHub


C0urante commented on PR #11986:
URL: https://github.com/apache/kafka/pull/11986#issuecomment-1579157272

   Closing due to lack of review. We can revisit in the future, although the 
downgrade of these log messages from `WARN` to `INFO` level in 
https://github.com/apache/kafka/pull/13225 likely addresses the underlying 
concern here that the `WARN` level is highly polluted with unused config 
property messages in Kafka Connect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante closed pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing

2023-06-06 Thread via GitHub


C0urante closed pull request #12666: KAFKA-14244: Add guard against accidental 
calls to halt JVM during testing
URL: https://github.com/apache/kafka/pull/12666


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing

2023-06-06 Thread via GitHub


C0urante commented on PR #12666:
URL: https://github.com/apache/kafka/pull/12666#issuecomment-1579153067

   Closing due to lack of interest; we can revisit this if necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13816: MINOR: Optimize runtime of MM2 integration tests by batching transactions

2023-06-06 Thread via GitHub


C0urante commented on code in PR #13816:
URL: https://github.com/apache/kafka/pull/13816#discussion_r1219842657


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/TestUtils.java:
##
@@ -34,14 +38,39 @@ static Map makeProps(String... keyValues) {
 }
 return props;
 }
-
-/*
- * return records with different but predictable key and value 
+
+/**
+ * Assemble a collection of records arbitrarily distributed across all 
partitions of the specified topic
+ * @param topicName Destination topic
+ * @param numRecords count of records to produce to the topic in total
+ * @return A batch of records that can be sent to a producer.
  */
-public static Map generateRecords(int numRecords) {
-Map records = new HashMap<>();
+public static List> generateRecords(String 
topicName, int numRecords) {
+List> records = new ArrayList<>();
 for (int i = 0; i < numRecords; i++) {
-records.put("key-" + i, "message-" + i);
+String key = "key-" + i;
+String value = "message-" + i;
+records.add(new ProducerRecord<>(topicName, null, key.getBytes(), 
value.getBytes()));
+}
+return records;
+}
+
+/**
+ * Assemble a collection of records evenly distributed across some 
partitions of the specified topic

Review Comment:
   We may want to add a note here that, if `numPartitions` is greater than one, 
this will cause records with the same key to be written to different partitions.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -234,6 +245,8 @@ public void startClusters(Map 
additionalMM2Config) throws Except
 @AfterEach
 public void shutdownClusters() throws Exception {
 try {
+primaryProducer.close();
+backupProducer.close();

Review Comment:
   We probably still want to try to gracefully shut down the cluster even if we 
encounter an error with stopping our producers:
   ```suggestion
   Utils.closeQuietly(primaryProducer);
   Utils.closeQuietly(backupProducer);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito

2023-06-06 Thread via GitHub


clolov commented on PR #13712:
URL: https://github.com/apache/kafka/pull/13712#issuecomment-1579134299

   Heya @cadonna! I hope I have addressed your comment, rebased and updated the 
overview!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-06 Thread via GitHub


C0urante commented on PR #13819:
URL: https://github.com/apache/kafka/pull/13819#issuecomment-1579102183

   The first CI run has finished with [no failing integration 
tests](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13819/1/tests/)
 for MirrorMaker 2 🎉
   
   Waiting for the [next 
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13819/2/pipeline)
 to conclude before marking ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-06 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1219946644


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;

Review Comment:
   I agree for the most part. The issue comes with 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
   
   I was running into problems fixing it and the PR kept ballooning. I might 
just have been doing something wrong. WHen We come back to fix the 
serialization error I would rather break it out into a second PR as it was 
getting messy 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR

2023-06-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15021:
---
Fix Version/s: 3.6.0

> KRaft controller increases leader epoch when shrinking ISR
> --
>
> Key: KAFKA-15021
> URL: https://issues.apache.org/jira/browse/KAFKA-15021
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> When the KRaft controller shrinks the ISR it also forces the leader epoch to 
> increase. This is unnecessary and cases all of the follower replica fetches 
> to get invalidated.
> Here is an example trace of this behavior after replica 8 was shutdown:
> {code:java}
> kafka-dump-log --cluster-metadata-decoder --files 
> __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw
> ...
> | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}}
> | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}}
> | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}}
> | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}}
> | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}}
> | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}}
> | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}}
>  {code}
> Also, notice how the leader epoch was not increased when the ISR was expanded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR

2023-06-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio reopened KAFKA-15021:


> KRaft controller increases leader epoch when shrinking ISR
> --
>
> Key: KAFKA-15021
> URL: https://issues.apache.org/jira/browse/KAFKA-15021
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
>
> When the KRaft controller shrinks the ISR it also forces the leader epoch to 
> increase. This is unnecessary and cases all of the follower replica fetches 
> to get invalidated.
> Here is an example trace of this behavior after replica 8 was shutdown:
> {code:java}
> kafka-dump-log --cluster-metadata-decoder --files 
> __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw
> ...
> | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}}
> | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}}
> | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}}
> | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}}
> | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}}
> | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}}
> | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}}
>  {code}
> Also, notice how the leader epoch was not increased when the ISR was expanded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR

2023-06-06 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-15021.

Resolution: Fixed

> KRaft controller increases leader epoch when shrinking ISR
> --
>
> Key: KAFKA-15021
> URL: https://issues.apache.org/jira/browse/KAFKA-15021
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
>
> When the KRaft controller shrinks the ISR it also forces the leader epoch to 
> increase. This is unnecessary and cases all of the follower replica fetches 
> to get invalidated.
> Here is an example trace of this behavior after replica 8 was shutdown:
> {code:java}
> kafka-dump-log --cluster-metadata-decoder --files 
> __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw
> ...
> | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}}
> | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}}
> | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}}
> | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}}
> | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}}
> | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}}
> | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 
> sequence: -1 headerKeys: [] payload: 
> {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}}
>  {code}
> Also, notice how the leader epoch was not increased when the ISR was expanded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vvcephei commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-06 Thread via GitHub


vvcephei commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1578989020

   Just checking to see if the github/jenkins trigger still works. It looks 
like it does not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-06 Thread via GitHub


vvcephei commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1578988387

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13772: MINOR: Add helper util `Snapshots.lastContainedLogTimestamp`

2023-06-06 Thread via GitHub


jsancio merged PR #13772:
URL: https://github.com/apache/kafka/pull/13772


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-06 Thread via GitHub


vvcephei commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578986116

   Yeah, sorry for the weird drive-bys. I was indeed trying to see if those 
commands still work. Apparently, they do not.
   
   I think this is the plugin documentation, but it also looks like it was 
associated with security flaws, so maybe Infra disabled it: 
https://plugins.jenkins.io/ghprb/
   
   I'll raise a ticket with them. At the least, if it doesn't work anymore, 
they should remove the docs for that role in asf.yaml.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-06 Thread via GitHub


C0urante commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578983184

   @vvcephei I kicked off another Jenkins build--I haven't seen GitHub comments 
working for a while to re-trigger them. Are you trying to test that out?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-06 Thread via GitHub


vvcephei commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-157898

   test this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-06 Thread via GitHub


vvcephei commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578980743

   re-test this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-06 Thread via GitHub


vvcephei commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578980602

   re-test this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13803: KAFKA-15051: add missing GET plugin/config endpoint

2023-06-06 Thread via GitHub


vvcephei commented on PR #13803:
URL: https://github.com/apache/kafka/pull/13803#issuecomment-1578980388

   Retest this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-06 Thread via GitHub


vvcephei commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578974789

   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-06 Thread via GitHub


vvcephei commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578973821

   retest this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-06 Thread via GitHub


vvcephei commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578973157

   I'm going to try something out for triggering builds...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-06 Thread via GitHub


mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1219834583


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -223,16 +223,16 @@ public void visitPartition(TopicIdPartition 
topicIdPartition, PartitionRegistrat
 );
 ConfigResource resource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicName);
 operationConsumer.accept(
-UPDATE_TOPIC_CONFIG,
-"Updating Configs for Topic " + topicName + ", ID " + topicId,
+DELETE_TOPIC_CONFIG,

Review Comment:
   Kind of. The opType is just used in logging, so there was a bug in that 
regard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-06 Thread via GitHub


mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1219833727


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -267,6 +267,9 @@ void handleTopicsDelta(
 ) {
 topicsDelta.deletedTopicIds().forEach(topicId -> {
 String name = deletedTopicNameResolver.apply(topicId);
+if (name == null) {

Review Comment:
   I ran into a NPE here in my test code when I passed an empty map.
   
   In the production code, the function passed in here the getter of 
`topicsById` map in TopicsImage. I think it's impossible for there to be 
something in `deletedTopicIds` that's not also in `topicsById`. This check 
isn't really necessary, but the custom RuntimeException is better than an NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15063) Throttle number of active PIDs

2023-06-06 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-15063:
--
Description: 
{color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} 
{color:#172b4d}producers became the default in Kafka as a result of this all 
producer instances will be assigned PID. The increase of the number of PIDs 
stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} 
exposes the broker to OOM errors if it has a high number of producers, a rogue 
or misconfigured client(s).{color}

{color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 
introduced a separate config to expire PID from transaction IDs if there is a 
high number of PID before {{producer.id.expiration.ms}} is exceeded.

As a result of this, the broker will keep experiencing OOM and become offline. 
The only way to recover from this is to increase the heap.  

 

{color:#172b4d}KIP-936 is proposing throttling the number of PIDs per 
KafkaPrincipal {color}

{color:#172b4d}See the KIP-936 details here  
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
 {color}

  was:
{color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} 
{color:#172b4d}producers became the default in Kafka as a result of this all 
producer instances will be assigned PID. The increase of the number of PIDs 
stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} 
exposes the broker to OOM errors if it has a high number of producers, a rogue 
or misconfigured client(s).{color}

{color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 
introduced a separate config to expire PID from transaction IDs if there is a 
high number of PID before {{producer.id.expiration.ms}} is exceeded. 

As a result of this, the broker will keep experiencing OOM and become offline. 
The only way to recover from this is to increase the heap.  

 

{color:#172b4d}KIP-936 is proposing throttling the number of PIDs per 
KafkaPrincipal {color}

{color:#172b4d}See the KIP details here  
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
 {color}


> Throttle number of active PIDs
> --
>
> Key: KAFKA-15063
> URL: https://issues.apache.org/jira/browse/KAFKA-15063
> Project: Kafka
>  Issue Type: New Feature
>  Components: core, producer 
>Affects Versions: 2.8.0, 3.1.0, 3.0.0, 3.2.0, 3.3, 3.4.0
>Reporter: Omnia Ibrahim
>Priority: Major
>
> {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} 
> {color:#172b4d}producers became the default in Kafka as a result of this all 
> producer instances will be assigned PID. The increase of the number of PIDs 
> stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} 
> exposes the broker to OOM errors if it has a high number of producers, a 
> rogue or misconfigured client(s).{color}
> {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 
> introduced a separate config to expire PID from transaction IDs if there is a 
> high number of PID before {{producer.id.expiration.ms}} is exceeded.
> As a result of this, the broker will keep experiencing OOM and become 
> offline. The only way to recover from this is to increase the heap.  
>  
> {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per 
> KafkaPrincipal {color}
> {color:#172b4d}See the KIP-936 details here  
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
>  {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15063) Throttle number of active PIDs

2023-06-06 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-15063:
--
Description: 
{color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} 
{color:#172b4d}producers became the default in Kafka as a result of this all 
producer instances will be assigned PID. The increase of the number of PIDs 
stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} 
exposes the broker to OOM errors if it has a high number of producers, a rogue 
or misconfigured client(s).{color}

{color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 
introduced a separate config to expire PID from transaction IDs if there is a 
high number of PID before {{producer.id.expiration.ms}} is exceeded. 

As a result of this, the broker will keep experiencing OOM and become offline. 
The only way to recover from this is to increase the heap.  

 

{color:#172b4d}KIP-936 is proposing throttling the number of PIDs per 
KafkaPrincipal {color}

{color:#172b4d}See the KIP details here  
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
 {color}

  was:
{color:#172b4d}Ticket to track to track KIP-936. Since KIP-679 
i{color:#172b4d}dempotent{color} {color:#172b4d}producers became the default in 
Kafka {color:#172b4d}as a result of this all producer instances will be 
assigned PID. The increase of number of PIDs stored in Kafka brokers by 
{color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors 
if it has a high number of producers, rogue or misconfigured client(s).{color} 
{color}{color}

{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}The
 broker is still exposed to OOM{color}{color}{color} even after KIP-854 
introduced a separated config to expire PID from transaction IDs if there is 
high number of PID before {color}{{producer.id.expiration.ms}}{color:#172b4d} 
is exceeded. 
{color}{color}{color}

As a result of this the broker will keep experincing OOM and become offline. 
The only way to recover from this is to increase the heap.  

 

{color:#172b4d}KIP-936 is proposing throttling number of PIDs per 
KafkaPrincipal {color}

{color:#172b4d}See 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
 {color}


> Throttle number of active PIDs
> --
>
> Key: KAFKA-15063
> URL: https://issues.apache.org/jira/browse/KAFKA-15063
> Project: Kafka
>  Issue Type: New Feature
>  Components: core, producer 
>Affects Versions: 2.8.0, 3.1.0, 3.0.0, 3.2.0, 3.3, 3.4.0
>Reporter: Omnia Ibrahim
>Priority: Major
>
> {color:#172b4d}Ticket to track KIP-936. Since KIP-679 idempotent{color} 
> {color:#172b4d}producers became the default in Kafka as a result of this all 
> producer instances will be assigned PID. The increase of the number of PIDs 
> stored in Kafka brokers by {color}{{ProducerStateManager}}{color:#172b4d} 
> exposes the broker to OOM errors if it has a high number of producers, a 
> rogue or misconfigured client(s).{color}
> {color:#172b4d}The broker is still exposed to OOM{color} even after KIP-854 
> introduced a separate config to expire PID from transaction IDs if there is a 
> high number of PID before {{producer.id.expiration.ms}} is exceeded. 
> As a result of this, the broker will keep experiencing OOM and become 
> offline. The only way to recover from this is to increase the heap.  
>  
> {color:#172b4d}KIP-936 is proposing throttling the number of PIDs per 
> KafkaPrincipal {color}
> {color:#172b4d}See the KIP details here  
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
>  {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14791) Create a builder class for PartitionRegistration

2023-06-06 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant resolved KAFKA-14791.
--
Resolution: Fixed

> Create a builder class for PartitionRegistration
> 
>
> Key: KAFKA-14791
> URL: https://issues.apache.org/jira/browse/KAFKA-14791
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-06 Thread via GitHub


dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1219799829


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,9 +579,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  /**
+   * Maybe create and return the verificationGuard object for the given 
producer ID if the transaction is not yet ongoing.

Review Comment:
   ditto.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+  // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+  // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+  // There are two phases -- the first append to the log and 
subsequent appends.
+  //
+  // 1. First append: Verification starts with creating a 
verificationGuard, sending a verification request to the transaction 
coordinator, and

Review Comment:
   nit `verification guard` for consistency. The are a few other cases in this 
comment.



##
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##
@@ -1087,6 +1087,46 @@ class ProducerStateManagerTest {
 assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+val originalEntry = stateManager.verificationStateEntry(producerId, true)
+val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit 
= {
+  val entry = stateManager.verificationStateEntry(producerId, false)
+  assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+  assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+}
+
+// If we already have an entry, reuse it.
+val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+verifyEntry(producerId, updatedEntry)
+
+// Before we add transactional data, we can't remove the entry.
+stateManager.clearVerificationStateEntry(producerId)
+verifyEntry(producerId, updatedEntry)
+
+// Add the transactional data and clear the entry
+append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
+stateManager.clearVerificationStateEntry(producerId)
+assertEquals(null, stateManager.verificationStateEntry(producerId, false))
+

Review Comment:
   nit: Empty line could be removed.



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = {
-leaderLogIfLocal.exists(leaderLog => 
leaderLog.hasOngoingTransaction(producerId))
+  // Returns a verificationGuard object if we need to verify. This starts or 
continues the verification process. Otherwise return null.

Review Comment:
   nit: `verification guard object`? I find `verificationGuard` confusing 
because we actually return an `Object`.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+  // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+  // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+  // There are two phases -- the first append to the log and 
subsequent appends.
+  //
+  // 1. First append: Verification starts with creating a 
verificationGuard, sending a verification request to the transaction 
coordinator, and
+  // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+  // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+  // have a sequence of events where we start a transaction 
verification, have the tran

[GitHub] [kafka] jsancio merged pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-06 Thread via GitHub


jsancio merged PR #13788:
URL: https://github.com/apache/kafka/pull/13788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-06 Thread via GitHub


jsancio commented on PR #13788:
URL: https://github.com/apache/kafka/pull/13788#issuecomment-1578924954

   Merging. Unrelated rest failures:
   ```
   testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15063) Throttle number of active PIDs

2023-06-06 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-15063:
-

 Summary: Throttle number of active PIDs
 Key: KAFKA-15063
 URL: https://issues.apache.org/jira/browse/KAFKA-15063
 Project: Kafka
  Issue Type: New Feature
  Components: core, producer 
Affects Versions: 3.4.0, 3.2.0, 3.0.0, 3.1.0, 2.8.0, 3.3
Reporter: Omnia Ibrahim


{color:#172b4d}Ticket to track to track KIP-936. Since KIP-679 
i{color:#172b4d}dempotent{color} {color:#172b4d}producers became the default in 
Kafka {color:#172b4d}as a result of this all producer instances will be 
assigned PID. The increase of number of PIDs stored in Kafka brokers by 
{color}{{ProducerStateManager}}{color:#172b4d} exposes the broker to OOM errors 
if it has a high number of producers, rogue or misconfigured client(s).{color} 
{color}{color}

{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}{color:#172b4d}The
 broker is still exposed to OOM{color}{color}{color} even after KIP-854 
introduced a separated config to expire PID from transaction IDs if there is 
high number of PID before {color}{{producer.id.expiration.ms}}{color:#172b4d} 
is exceeded. 
{color}{color}{color}

As a result of this the broker will keep experincing OOM and become offline. 
The only way to recover from this is to increase the heap.  

 

{color:#172b4d}KIP-936 is proposing throttling number of PIDs per 
KafkaPrincipal {color}

{color:#172b4d}See 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
 {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-06 Thread via GitHub


mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1219791584


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -475,12 +478,13 @@ void handleProducerIdSnapshot(ProducerIdsImage image, 
KRaftMigrationOperationCon
 void handleConfigsDelta(ConfigurationsImage configsImage, 
ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer 
operationConsumer) {
 Set updatedResources = configsDelta.changes().keySet();
 updatedResources.forEach(configResource -> {
+String opType = brokerOrTopicOpType(configResource, 
UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
 Map props = 
configsImage.configMapForResource(configResource);
 if (props.isEmpty()) {
-operationConsumer.accept("DeleteConfig", "Delete configs for " 
+ configResource, migrationState ->
+operationConsumer.accept(opType, "Delete configs for " + 
configResource, migrationState ->

Review Comment:
   Yup, good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-06 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1578890756

   > @hudeqi I added myself as a reviewer, I may not have time to review this 
today but will get to it this week.
   
   OK,thanks your time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-06 Thread via GitHub


dajac merged PR #13675:
URL: https://github.com/apache/kafka/pull/13675


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-06 Thread via GitHub


viktorsomogyi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1578862889

   @hudeqi I added myself as a reviewer, I may not have time to review this 
today but will get to it this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-06 Thread via GitHub


dajac opened a new pull request, #13820:
URL: https://github.com/apache/kafka/pull/13820

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-06-06 Thread via GitHub


ijuma commented on code in PR #13313:
URL: https://github.com/apache/kafka/pull/13313#discussion_r1219692414


##
tests/kafkatest/directory_layout/kafka_path.py:
##
@@ -49,6 +49,11 @@
 CORE_DEPENDANT_TEST_LIBS_JAR_NAME: 
"core/build/dependant-testlibs/*.jar",
 TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
 TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
+},
+# TODO remove with KAFKA-14762

Review Comment:
   Can we explain why we do this in this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-06-06 Thread via GitHub


ijuma commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1578803245

   @gharris1727 do the system tests pass with this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison closed pull request #3955: KAFKA-5969: Use correct error message when the JSON file is invalid

2023-06-06 Thread via GitHub


mimaison closed pull request #3955: KAFKA-5969: Use correct error message when 
the JSON file is invalid
URL: https://github.com/apache/kafka/pull/3955


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #3955: KAFKA-5969: Use correct error message when the JSON file is invalid

2023-06-06 Thread via GitHub


mimaison commented on PR #3955:
URL: https://github.com/apache/kafka/pull/3955#issuecomment-1578794742

   `PreferredReplicaLeaderElectionCommand` does not exist anymore and 
`LeaderElectionCommand` does not have this issue so closing this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-06-06 Thread via GitHub


C0urante merged PR #13771:
URL: https://github.com/apache/kafka/pull/13771


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13771: MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult

2023-06-06 Thread via GitHub


C0urante commented on PR #13771:
URL: https://github.com/apache/kafka/pull/13771#issuecomment-1578788384

   Test failures appear unrelated (there is a known bug in the reflections 
library that we use that caused a test failure which may appear related, but is 
not). Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-06 Thread via GitHub


jeffkbkim commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1219669199


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * A simple interface to write records to Partitions/Logs. It contains the 
minimum
+ * required for coordinators.
+ *
+ * @param  The record type.
+ */
+public interface PartitionWriter {
+
+/**
+ * Serializer to translate T to bytes.
+ *
+ * @param  The record type.
+ */
+interface Serializer {
+/**
+ * Serializes the key of the record.
+ */
+byte[] serializeKey(T record);
+
+/**
+ * Serializes the value of the record.
+ */
+byte[] serializeValue(T record);
+}
+
+/**
+ * Listener allowing to listen to high watermark changes. This is meant
+ * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, List)}}.

Review Comment:
   ah found HighWatermarkListener. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-06 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1219648522


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test

Review Comment:
   All these new tests seem to be specific to the `ConnectorOffsetBackingStore` 
class - should we move them to a new `ConnectorOffsetBackingStoreTest` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-06 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1219596953


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 

Review Comment:
   This `ConnectorOffsetBackingStore::set` method's Javadoc also needs to be 
updated to mention the special case handling for batches with `null` offsets 
since it currently states the following:
   
   ```
* If configured to use a connector-specific offset store, the 
returned {@link Future} corresponds to a
* write to that store, and the passed-in {@link Callback} is invoked 
once that write completes. If a worker-global
* store is provided, a secondary write is made to that store if the 
write to the connector-specific store
* succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
* {@link Callback}; they are only logged as a warning to users.
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);

Review Comment:
   nit: can be simplified
   ```suggestion
   boolean containsTombstones = values.containsValue(null);
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+secondaryStore.set(values, (secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+}
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
-if (secondaryStore != null) {
+// Secondary store writes have already happened for tombstone 
records

Review Comment:
   How do we know this if we aren't blocking on the write to the secondary 
store above? I believe we should do a synchronous write to the secondary store 
in this tombstone offset case.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -302,7 +326,12 @@ public Future set(Map 
values, Callback callb
 }
 }
 try (LoggingContext context = loggingContext()) {
-callback.onCompletion(primaryWriteError, ignored);
+Throwable secondaryWriteError = 
secondaryStoreTombstoneWriteError.get();
+if (secondaryStore != null && containsTombstones && 
secondaryWriteError != null) {

Review Comment:
   Same as above - we aren't blocking on the write to the secondary store, so 
we can't be sure that it has completed at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode

2023-06-06 Thread via GitHub


showuon commented on code in PR #13807:
URL: https://github.com/apache/kafka/pull/13807#discussion_r1219562194


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3701,6 +3721,11 @@ class ReplicaManagerTest {
 
   assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
 
+  if (enableRemoteStorage) {

Review Comment:
   Good catch! I thought I've covered every KRaft test cases. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

2023-06-06 Thread via GitHub


clolov commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1578656781

   Heya @cadonna, I hope I have addressed all of your comments:
   * Updated the PR's overview
   * Rebased
   * Comments on the code itself
   
   Interestingly enough the test `shouldInitializeNewStandbyTasks` fails 
locally when I run all tests, but succeeds when it is ran in isolation. If the 
build passes I will blame it on something in my environment, but if it fails I 
would be glad for another pair of eyes as to what might be causing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-06 Thread via GitHub


viktorsomogyi commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1578650158

   I had a short chat with @urbandan yesterday to understand the scope of this 
fix. During the conversation we came to the conclusion that bumping the epoch 
without a safety check is generally unsafe as there might be requests in the 
queue that are timed out with request timeout yet successfully appended on the 
broker and still wait in the queue in the producer for a retry while their 
epoch is being bumped.
   While this PR fixes once case, It would be good to review all usages where 
the epoch is being bumped. I don't insist on redoing all the other cases, so 
let me know Daniel if you want to expand the scope of this or do it in a follow 
up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-06 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   I'm wondering whether we should go ah

[jira] [Commented] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances

2023-06-06 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729709#comment-17729709
 ] 

Chris Egerton commented on KAFKA-15059:
---

On second thought, it may be unnecessary to check for a pending rebalance at 
all.

 

If the worker that we forward the zombie fencing request to is a zombie leader 
(i.e., a worker that believes it is the leader but in reality is not), it will 
fail to finish the round of zombie fencing because it won't be able to write to 
the config topic with a transactional producer.

If the connector has just been deleted, we'll still fail the request since we 
force a read-to-end of the config topic and refresh our snapshot of its 
contents before checking to see if the connector exists.

And regardless, the worker that owns the task will still do a read-to-end of 
the config topic and verify that (1) no new task configs have been generated 
for the connector and (2) the worker is still assigned the connector, before 
allowing the task to process any data.

> Exactly-once source tasks fail to start during pending rebalances
> -
>
> Key: KAFKA-15059
> URL: https://issues.apache.org/jira/browse/KAFKA-15059
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When asked to perform a round of zombie fencing, the distributed herder will 
> [reject the 
> request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250]
>  if a rebalance is pending, which can happen if (among other things) a config 
> for a new connector or a new set of task configs has been recently read from 
> the config topic.
> Normally this can be alleviated with a simple task restart, which isn't great 
> but isn't terrible.
> However, when running MirrorMaker 2 in dedicated mode, there is no API to 
> restart failed tasks, and it can be more common to see this kind of failure 
> on a fresh cluster because three connector configurations are written in 
> rapid succession to the config topic.
>  
> In order to provide a better experience for users of both vanilla Kafka 
> Connect and dedicated MirrorMaker 2 clusters, we can retry (likely with the 
> same exponential backoff introduced with KAFKA-14732) zombie fencing attempts 
> that fail due to a pending rebalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-06 Thread via GitHub


yashmayya commented on PR #13818:
URL: https://github.com/apache/kafka/pull/13818#issuecomment-1578630274

    Sink connector offsets alter requests:
   ```
   ConnectorsResource::alterConnectorOffsets -> 
AbstractHerder::alterConnectorOffsets -> 
(Distributed|Standalone)Herder::modifyConnectorOffsets -> 
Worker::alterConnectorOffsets -> Worker::modifySinkConnectorOffsets -> 
Worker::alterSinkConnectorOffset
   ```
   
    Source connector offsets alter requests:
   ```
   ConnectorsResource::alterConnectorOffsets -> 
AbstractHerder::alterConnectorOffsets -> 
(Distributed|Standalone)Herder::modifyConnectorOffsets -> 
Worker::alterConnectorOffsets -> Worker::modifySourceConnectorOffsets
   ```
   
    Sink connector offsets reset requests:
   ```
   ConnectorsResource::resetConnectorOffsets -> 
AbstractHerder::resetConnectorOffsets -> 
(Distributed|Standalone)Herder::modifyConnectorOffsets -> 
Worker::resetConnectorOffsets -> Worker::modifySinkConnectorOffsets -> 
Worker::resetSinkConnectorOffsets
   ```
   
    Source connector offsets reset requests:
   ```
   ConnectorsResource::resetConnectorOffsets -> 
AbstractHerder::resetConnectorOffsets -> 
(Distributed|Standalone)Herder::modifyConnectorOffsets -> 
Worker::resetConnectorOffsets -> Worker::modifySourceConnectorOffsets
   ```
   
   The current flows for altering and resetting offsets along with the use of 
`null` offsets in multiple places to distinguish between alter and reset 
offsets requests might seem a little clunky (especially for sink connectors), 
but I've tried to optimize for code re-use in both the herder implementations 
as well as the worker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #13818: KAFKA-14784: Connect offset reset REST API

2023-06-06 Thread via GitHub


yashmayya opened a new pull request, #13818:
URL: https://github.com/apache/kafka/pull/13818

   - https://issues.apache.org/jira/browse/KAFKA-14784
   - [KIP-875: First-class offsets support in Kafka 
Connect](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect)
   - Implements the new `DELETE /connectors/{connector}/offsets` REST API
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT

2023-06-06 Thread Haruki Okada (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haruki Okada reassigned KAFKA-14445:


Assignee: Haruki Okada

> Producer doesn't request metadata update on REQUEST_TIMED_OUT
> -
>
> Key: KAFKA-14445
> URL: https://issues.apache.org/jira/browse/KAFKA-14445
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>
> Produce requests may fail with timeout by `request.timeout.ms` in below two 
> cases:
>  * Didn't receive produce response within `request.timeout.ms`
>  * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the 
> broker
> Former case usually happens when a broker-machine got failed or there's 
> network glitch etc.
> In this case, the connection will be disconnected and metadata-update will be 
> requested to discover new leader: 
> [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556]
>  
> The problem is in latter case (REQUEST_TIMED_OUT on the broker).
> In this case, the produce request will be ended up with TimeoutException, 
> which doesn't inherit InvalidMetadataException so it doesn't trigger metadata 
> update.
>  
> Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side 
> problem, that metadata-update doesn't make much sense indeed.
>  
> However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT 
> could cause produce requests to retry unnecessarily , which may end up with 
> batch expiration due to delivery timeout.
> Below is the scenario we experienced:
>  * Environment:
>  ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1
>  ** min.insync.replicas=2
>  ** acks=all
>  * Scenario:
>  ** broker 1 "partially" failed
>  *** It lost ZooKeeper connection and kicked out from the cluster
>   There was controller log like:
>  * 
> {code:java}
> [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , 
> deleted brokers: 1, bounced brokers: {code}
>  * 
>  ** 
>  *** However, somehow the broker was able continued to receive produce 
> requests
>   We're still working on investigating how this is possible though.
>   Indeed, broker 1 was somewhat "alive" and keeps working according to 
> server.log
>  *** In other words, broker 1 became "zombie"
>  ** broker 2 was elected as new leader
>  *** broker 3 became follower of broker 2
>  *** However, since broker 1 was still out of cluster, it didn't receive 
> LeaderAndIsr so 1 kept thinking itself as the leader of tp-0
>  ** Meanwhile, producer keeps sending produce requests to broker 1 and 
> requests were failed due to REQUEST_TIMED_OUT because no brokers replicates 
> from broker 1.
>  *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't 
> have a change to update its stale metadata
>  
> So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, 
> to address the case that the old leader became "zombie"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT

2023-06-06 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729695#comment-17729695
 ] 

Haruki Okada commented on KAFKA-14445:
--

[~kirktrue] Thanks for your patch about 
https://issues.apache.org/jira/browse/KAFKA-14317 .

 

However if I read the patch correctly, I guess our original issue should be 
addressed separately.

Our issue was the case where the producer receives REQUEST_TIMED_OUT response 
(i.e. request timed out inside the purgatory while waiting replication), rather 
than NetworkClient-level timeout.

So I think the || clause here 
([https://github.com/apache/kafka/pull/12813#discussion_r1048223644]) was 
necessary against the discussion.

 

Though this is kind of extreme edge case, I would like to solve anyways as it 
caused a batch expiration on our producer.

 

I'll submit a follow-up patch.

> Producer doesn't request metadata update on REQUEST_TIMED_OUT
> -
>
> Key: KAFKA-14445
> URL: https://issues.apache.org/jira/browse/KAFKA-14445
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Haruki Okada
>Priority: Major
>
> Produce requests may fail with timeout by `request.timeout.ms` in below two 
> cases:
>  * Didn't receive produce response within `request.timeout.ms`
>  * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the 
> broker
> Former case usually happens when a broker-machine got failed or there's 
> network glitch etc.
> In this case, the connection will be disconnected and metadata-update will be 
> requested to discover new leader: 
> [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556]
>  
> The problem is in latter case (REQUEST_TIMED_OUT on the broker).
> In this case, the produce request will be ended up with TimeoutException, 
> which doesn't inherit InvalidMetadataException so it doesn't trigger metadata 
> update.
>  
> Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side 
> problem, that metadata-update doesn't make much sense indeed.
>  
> However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT 
> could cause produce requests to retry unnecessarily , which may end up with 
> batch expiration due to delivery timeout.
> Below is the scenario we experienced:
>  * Environment:
>  ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1
>  ** min.insync.replicas=2
>  ** acks=all
>  * Scenario:
>  ** broker 1 "partially" failed
>  *** It lost ZooKeeper connection and kicked out from the cluster
>   There was controller log like:
>  * 
> {code:java}
> [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , 
> deleted brokers: 1, bounced brokers: {code}
>  * 
>  ** 
>  *** However, somehow the broker was able continued to receive produce 
> requests
>   We're still working on investigating how this is possible though.
>   Indeed, broker 1 was somewhat "alive" and keeps working according to 
> server.log
>  *** In other words, broker 1 became "zombie"
>  ** broker 2 was elected as new leader
>  *** broker 3 became follower of broker 2
>  *** However, since broker 1 was still out of cluster, it didn't receive 
> LeaderAndIsr so 1 kept thinking itself as the leader of tp-0
>  ** Meanwhile, producer keeps sending produce requests to broker 1 and 
> requests were failed due to REQUEST_TIMED_OUT because no brokers replicates 
> from broker 1.
>  *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't 
> have a change to update its stale metadata
>  
> So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, 
> to address the case that the old leader became "zombie"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] urbandan commented on a diff in pull request #13813: KAFKA-13756: Connect validate endpoint should return proper validatio…

2023-06-06 Thread via GitHub


urbandan commented on code in PR #13813:
URL: https://github.com/apache/kafka/pull/13813#discussion_r1219366831


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -464,10 +465,22 @@ ConfigInfos validateConnectorConfig(Map 
connectorProps, boolean
 connectorProps = 
worker.configTransformer().transform(connectorProps);
 }
 String connType = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-if (connType == null)
-throw new BadRequestException("Connector config " + connectorProps 
+ " contains no connector type");
+if (connType == null) {
+return createConnectorClassError("Config contains no connector 
type");
+}
+
+Connector connector;
+try {
+connector = getConnector(connType);
+} catch (ConnectException e) {
+return createConnectorClassError(e.getMessage());

Review Comment:
   I agree. I was tempted to just remove the list of connectors as a whole, but 
there are other call sites where the exception message is logged. In case it 
can be useful for diagnostics, I added a parameter instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Vaibhav-Nazare opened a new pull request, #13817: KAFKA-15062: Adding ppc64le build stage

2023-06-06 Thread via GitHub


Vaibhav-Nazare opened a new pull request, #13817:
URL: https://github.com/apache/kafka/pull/13817

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15062) Power(ppc64le) support for Kafka

2023-06-06 Thread Vaibhav (Jira)
Vaibhav created KAFKA-15062:
---

 Summary: Power(ppc64le) support for Kafka
 Key: KAFKA-15062
 URL: https://issues.apache.org/jira/browse/KAFKA-15062
 Project: Kafka
  Issue Type: Task
  Components: build
Reporter: Vaibhav


Support for Power architecture (ppc64le) for apache kafka.

What is IBM Power architecture?
It is a RISC architecture and IBM has recently made its ISA (Instruction Set 
Architecture) opensource and in doing so, they have significantly contributed 
back to the opensource community at large. Many of the pioneers of banking and 
HPC industries today run on ppc64le architecture.

As an ongoing effort to enable open-source projects where Power architecture 
can add value, we are trying to enable kafka on Power.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on a diff in pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients

2023-06-06 Thread via GitHub


lucasbru commented on code in PR #13811:
URL: https://github.com/apache/kafka/pull/13811#discussion_r1219320732


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1661,6 +1663,10 @@ public void handleResponse(AbstractResponse response) {
 || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
 // If the topic is unknown or the coordinator is loading, 
retry with the current coordinator
 continue;
+} else if (error == Errors.INVALID_PRODUCER_EPOCH

Review Comment:
   Hmm, not sure. There shouldn't be any functional difference, so I moved it 
back down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #13811: KAFKA-14278: Fix InvalidProducerEpochException and InvalidTxnStateException handling in producer clients

2023-06-06 Thread via GitHub


lucasbru commented on PR #13811:
URL: https://github.com/apache/kafka/pull/13811#issuecomment-1578303825

   > Hey Lucas -- thanks for the PR. Just wanted to confirm -- these changes 
are in line with what is proposed as part of KIP-691? It looks to me that is 
the case, but wanted to confirm.
   
   Yes, exactly. This is one of the changes to clean up exceptions in 
preparation for KIP-691


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-06 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1578277690

   Hello, are you free to help review this PR? @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode

2023-06-06 Thread via GitHub


satishd commented on code in PR #13807:
URL: https://github.com/apache/kafka/pull/13807#discussion_r1219258912


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3701,6 +3721,11 @@ class ReplicaManagerTest {
 
   assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
 
+  if (enableRemoteStorage) {

Review Comment:
   Do you also want to add similar coverage for both the below tests?
   
   ```
   testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath
   testReplicasAreStoppedWhileInControlledShutdownWithKRaft
   ```



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3677,20 +3682,35 @@ class ReplicaManagerTest {
 assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, 
BAR_UUID))
   }
 
-  @Test
-  def testDeltaFromLeaderToFollower(): Unit = {
+  private def verifyRLMonLeadershipChange(leaderPartitions: 
util.Set[Partition], followerPartitions: util.Set[Partition]): Unit = {

Review Comment:
   nit: `verifyRLMonLeadershipChange` to `verifyRLMOnLeadershipChange`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-06-06 Thread via GitHub


mimaison merged PR #13473:
URL: https://github.com/apache/kafka/pull/13473


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >