This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new d1b381a185e KAFKA-17639 Add Java 23 to CI (#19396)
d1b381a185e is described below
commit d1b381a185ea97ef0170bda8a1d57ea5722a3e91
Author: Luke Chen <[email protected]>
AuthorDate: Wed Apr 9 19:10:07 2025 +0900
KAFKA-17639 Add Java 23 to CI (#19396)
KAFKA-17639 Add Java 23 to CI.
Backported from Commit 76a9df4 and updated Jenkinsfile.
Reviewers: Mickael Maison <[email protected]>
---
Jenkinsfile | 19 +++++++++++++++
.../kafka/common/internals/KafkaFutureImpl.java | 28 ++++++++++++++++++++--
.../SaslPlainSslEndToEndAuthorizationTest.scala | 5 ++--
.../apache/kafka/jmh/util/ByteUtilsBenchmark.java | 8 +++----
4 files changed, 51 insertions(+), 9 deletions(-)
diff --git a/Jenkinsfile b/Jenkinsfile
index 0a795637ff6..0023b76a8e7 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -180,6 +180,25 @@ pipeline {
echo 'Skipping Kafka Streams archetype test for Java 21'
}
}
+
+ stage('JDK 23 and Scala 2.13') {
+ agent { label 'ubuntu' }
+ tools {
+ jdk 'jdk_23_latest'
+ }
+ options {
+ timeout(time: 8, unit: 'HOURS')
+ timestamps()
+ }
+ environment {
+ SCALA_VERSION=2.13
+ }
+ steps {
+ doValidation()
+ doTest(env)
+ echo 'Skipping Kafka Streams archetype test for Java 23'
+ }
+ }
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index fbcde6a1e24..0d88c91740d 100644
---
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -163,7 +163,10 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
public T get() throws InterruptedException, ExecutionException {
try {
return completableFuture.get();
- } catch (ExecutionException e) {
+ // In Java 23, When a CompletableFuture is cancelled, get() will
throw a CancellationException wrapping a
+ // CancellationException, thus we need to unwrap it to maintain
the KafkaFuture behaviour.
+ // see https://bugs.openjdk.org/browse/JDK-8331987
+ } catch (ExecutionException | CancellationException e) {
maybeThrowCancellationException(e.getCause());
throw e;
}
@@ -178,7 +181,10 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
TimeoutException {
try {
return completableFuture.get(timeout, unit);
- } catch (ExecutionException e) {
+ // In Java 23, When a CompletableFuture is cancelled, get() will
throw a CancellationException wrapping a
+ // CancellationException, thus we need to unwrap it to maintain
the KafkaFuture behaviour.
+ // see https://bugs.openjdk.org/browse/JDK-8331987
+ } catch (ExecutionException | CancellationException e) {
maybeThrowCancellationException(e.getCause());
throw e;
}
@@ -192,6 +198,15 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
public T getNow(T valueIfAbsent) throws ExecutionException {
try {
return completableFuture.getNow(valueIfAbsent);
+ } catch (CancellationException e) {
+ // In Java 23, When a CompletableFuture is cancelled, getNow()
will throw a CancellationException wrapping a
+ // CancellationException. whereas in Java < 23, it throws a
CompletionException directly.
+ // see https://bugs.openjdk.org/browse/JDK-8331987
+ if (e.getCause() instanceof CancellationException) {
+ throw (CancellationException) e.getCause();
+ } else {
+ throw e;
+ }
} catch (CompletionException e) {
maybeThrowCancellationException(e.getCause());
// Note, unlike CompletableFuture#get() which throws
ExecutionException, CompletableFuture#getNow()
@@ -247,6 +262,15 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
Throwable exception = null;
try {
value = completableFuture.getNow(null);
+ } catch (CancellationException e) {
+ // In Java 23, When a CompletableFuture is cancelled, getNow()
will throw a CancellationException wrapping a
+ // CancellationException. whereas in Java < 23, it throws a
CompletionException directly.
+ // see https://bugs.openjdk.org/browse/JDK-8331987
+ if (e.getCause() instanceof CancellationException) {
+ exception = e.getCause();
+ } else {
+ exception = e;
+ }
} catch (CompletionException e) {
exception = e.getCause();
} catch (Exception e) {
diff --git
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index b4772aa3cc5..de0d1a3260c 100644
---
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -22,6 +22,7 @@ import kafka.utils.TestUtils.isAclSecure
import kafka.zk.ZkData
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.internals.SecurityManagerCompatibility
import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.common.security.auth._
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
@@ -29,9 +30,7 @@ import
org.apache.kafka.common.security.plain.PlainAuthenticateCallback
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
-import java.security.AccessController
import java.util.Properties
-import javax.security.auth.Subject
import javax.security.auth.callback._
import javax.security.auth.login.AppConfigurationEntry
import scala.collection.Seq
@@ -85,7 +84,7 @@ object SaslPlainSslEndToEndAuthorizationTest {
class TestClientCallbackHandler extends AuthenticateCallbackHandler {
def configure(configs: java.util.Map[String, _], saslMechanism: String,
jaasConfigEntries: java.util.List[AppConfigurationEntry]): Unit = {}
def handle(callbacks: Array[Callback]): Unit = {
- val subject = Subject.getSubject(AccessController.getContext)
+ val subject = SecurityManagerCompatibility.get().current()
val username =
subject.getPublicCredentials(classOf[String]).iterator().next()
for (callback <- callbacks) {
callback match {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
index e0184efcf42..cf83f78b866 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
@@ -41,10 +41,6 @@ import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
-@OutputTimeUnit(TimeUnit.SECONDS)
-@Fork(3)
-@Warmup(iterations = 3, time = 1)
-@Measurement(iterations = 5, time = 1)
/**
* This benchmark calculates the empirical evidence of different
implementation for encoding/decoding a protobuf
* <a
href="https://protobuf.dev/programming-guides/encoding/#varints">VarInt</a> and
VarLong.
@@ -52,6 +48,10 @@ import java.util.concurrent.TimeUnit;
* The benchmark uses JMH and calculates results for different sizes of
variable length integer. We expect most of the
* usage in Kafka code base to be 1 or 2 byte integers.
*/
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(3)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
public class ByteUtilsBenchmark {
private static final int DATA_SET_SAMPLE_SIZE = 16384;