This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50b748b0247 MINOR: Cleanup Connect Module (2/n) (#19871)
50b748b0247 is described below

commit 50b748b024716ae84027b7030f955dcaeb8653f7
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Tue Jul 22 19:53:54 2025 +0530

    MINOR: Cleanup Connect Module (2/n) (#19871)
    
    Now that Kafka support Java 17, this PR makes some changes in connect
    module. The changes in this PR are limited to only some files. A future
    PR(s) shall follow.
    The changes mostly include:
    - Collections.emptyList(), Collections.singletonList() and
    Arrays.asList() are replaced with List.of()
    - Collections.emptyMap() and Collections.singletonMap() are replaced
    with Map.of()
    - Collections.singleton() is replaced with Set.of()
    
    Modules target: test-plugins, transforms
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/connect/tools/MockConnector.java  |   3 +-
 .../apache/kafka/connect/tools/MockSourceTask.java |   3 +-
 .../kafka/connect/tools/SchemaSourceTask.java      |   9 +-
 .../kafka/connect/tools/VerifiableSourceTask.java  |   7 +-
 .../org/apache/kafka/connect/transforms/Cast.java  |  77 +++++-------
 .../kafka/connect/transforms/HeaderFrom.java       |  13 +-
 .../apache/kafka/connect/transforms/MaskField.java |  50 ++++----
 .../kafka/connect/transforms/ReplaceField.java     |   7 +-
 .../connect/transforms/TimestampConverter.java     |  39 +++---
 .../connect/transforms/field/SingleFieldPath.java  |   5 +-
 .../apache/kafka/connect/transforms/CastTest.java  |  83 ++++++------
 .../kafka/connect/transforms/DropHeadersTest.java  |   9 +-
 .../kafka/connect/transforms/ExtractFieldTest.java |  21 ++--
 .../kafka/connect/transforms/FlattenTest.java      |  47 ++++---
 .../kafka/connect/transforms/HeaderFromTest.java   |  36 +++---
 .../kafka/connect/transforms/HoistFieldTest.java   |   9 +-
 .../kafka/connect/transforms/InsertFieldTest.java  |   9 +-
 .../kafka/connect/transforms/InsertHeaderTest.java |   5 +-
 .../kafka/connect/transforms/MaskFieldTest.java    | 140 ++++++++++-----------
 .../connect/transforms/SetSchemaMetadataTest.java  |   5 +-
 .../connect/transforms/TimestampConverterTest.java |  43 ++++---
 .../connect/transforms/TimestampRouterTest.java    |   4 +-
 .../kafka/connect/transforms/ValueToKeyTest.java   |   7 +-
 .../transforms/field/FieldSyntaxVersionTest.java   |  10 +-
 .../transforms/predicates/HasHeaderKeyTest.java    |  10 +-
 .../predicates/TopicNameMatchesTest.java           |   7 +-
 .../transforms/util/NonEmptyListValidatorTest.java |   6 +-
 27 files changed, 302 insertions(+), 362 deletions(-)

diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
index 267466a4b0b..f598feede8c 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
@@ -24,7 +24,6 @@ import org.apache.kafka.connect.connector.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
@@ -91,7 +90,7 @@ public class MockConnector extends Connector {
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         log.debug("Creating single task for MockConnector");
-        return Collections.singletonList(config);
+        return List.of(config);
     }
 
     @Override
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
index f69c58b99ab..49dc5e8a7e6 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
@@ -23,7 +23,6 @@ import org.apache.kafka.connect.source.SourceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -67,7 +66,7 @@ public class MockSourceTask extends SourceTask {
                 throw new RuntimeException();
             }
         }
-        return Collections.emptyList();
+        return List.of();
     }
 
     @Override
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
index c40e0932e53..d79c133f673 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.server.util.ThroughputThrottler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -104,7 +103,7 @@ public class SchemaSourceTask extends SourceTask {
         }
 
         throttler = new ThroughputThrottler(throughput, 
System.currentTimeMillis());
-        partition = Collections.singletonMap(ID_FIELD, id);
+        partition = Map.of(ID_FIELD, id);
         Map<String, Object> previousOffset = 
this.context.offsetStorageReader().offset(partition);
         if (previousOffset != null) {
             seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
@@ -124,7 +123,7 @@ public class SchemaSourceTask extends SourceTask {
                 throttler.throttle();
             }
 
-            Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, 
seqno);
+            Map<String, Long> ccOffset = Map.of(SEQNO_FIELD, seqno);
             int partitionVal = (int) (seqno % partitionCount);
             final Struct data;
             final SourceRecord srcRecord;
@@ -158,10 +157,10 @@ public class SchemaSourceTask extends SourceTask {
             System.out.println("{\"task\": " + id + ", \"seqno\": " + seqno + 
"}");
             seqno++;
             count++;
-            return Collections.singletonList(srcRecord);
+            return List.of(srcRecord);
         } else {
             throttler.throttle();
-            return Collections.emptyList();
+            return List.of();
         }
     }
 
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
index 49151b40d1e..1fe2bd31802 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -94,7 +93,7 @@ public class VerifiableSourceTask extends SourceTask {
             throw new ConnectException("Invalid VerifiableSourceTask 
configuration", e);
         }
 
-        partition = Collections.singletonMap(ID_FIELD, id);
+        partition = Map.of(ID_FIELD, id);
         Map<String, Object> previousOffset = 
this.context.offsetStorageReader().offset(partition);
         if (previousOffset != null)
             seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
@@ -129,11 +128,11 @@ public class VerifiableSourceTask extends SourceTask {
         }
         System.out.println(dataJson);
 
-        Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, 
seqno);
+        Map<String, Long> ccOffset = Map.of(SEQNO_FIELD, seqno);
         Schema valueSchema = completeRecordData ? COMPLETE_VALUE_SCHEMA : 
Schema.INT64_SCHEMA;
         Object value = completeRecordData ? completeValue(data) : seqno;
         SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, 
Schema.INT32_SCHEMA, id, valueSchema, value);
-        List<SourceRecord> result = Collections.singletonList(srcRecord);
+        List<SourceRecord> result = List.of(srcRecord);
         seqno++;
         return result;
     }
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 099e1a64882..7c13ef4d785 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -231,38 +231,26 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
     }
 
     private SchemaBuilder convertFieldType(Schema.Type type) {
-        switch (type) {
-            case INT8:
-                return SchemaBuilder.int8();
-            case INT16:
-                return SchemaBuilder.int16();
-            case INT32:
-                return SchemaBuilder.int32();
-            case INT64:
-                return SchemaBuilder.int64();
-            case FLOAT32:
-                return SchemaBuilder.float32();
-            case FLOAT64:
-                return SchemaBuilder.float64();
-            case BOOLEAN:
-                return SchemaBuilder.bool();
-            case STRING:
-                return SchemaBuilder.string();
-            default:
-                throw new DataException("Unexpected type in Cast 
transformation: " + type);
-        }
+        return switch (type) {
+            case INT8 -> SchemaBuilder.int8();
+            case INT16 -> SchemaBuilder.int16();
+            case INT32 -> SchemaBuilder.int32();
+            case INT64 -> SchemaBuilder.int64();
+            case FLOAT32 -> SchemaBuilder.float32();
+            case FLOAT64 -> SchemaBuilder.float64();
+            case BOOLEAN -> SchemaBuilder.bool();
+            case STRING -> SchemaBuilder.string();
+            default -> throw new DataException("Unexpected type in Cast 
transformation: " + type);
+        };
     }
 
     private static Object encodeLogicalType(Schema schema, Object value) {
-        switch (schema.name()) {
-            case Date.LOGICAL_NAME:
-                return Date.fromLogical(schema, (java.util.Date) value);
-            case Time.LOGICAL_NAME:
-                return Time.fromLogical(schema, (java.util.Date) value);
-            case Timestamp.LOGICAL_NAME:
-                return Timestamp.fromLogical(schema, (java.util.Date) value);
-        }
-        return value;
+        return switch (schema.name()) {
+            case Date.LOGICAL_NAME -> Date.fromLogical(schema, 
(java.util.Date) value);
+            case Time.LOGICAL_NAME -> Time.fromLogical(schema, 
(java.util.Date) value);
+            case Timestamp.LOGICAL_NAME -> Timestamp.fromLogical(schema, 
(java.util.Date) value);
+            default -> value;
+        };
     }
 
     private static Object castValueToType(Schema schema, Object value, 
Schema.Type targetType) {
@@ -283,26 +271,17 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
                 value = encodeLogicalType(schema, value);
             }
 
-            switch (targetType) {
-                case INT8:
-                    return castToInt8(value);
-                case INT16:
-                    return castToInt16(value);
-                case INT32:
-                    return castToInt32(value);
-                case INT64:
-                    return castToInt64(value);
-                case FLOAT32:
-                    return castToFloat32(value);
-                case FLOAT64:
-                    return castToFloat64(value);
-                case BOOLEAN:
-                    return castToBoolean(value);
-                case STRING:
-                    return castToString(value);
-                default:
-                    throw new DataException(targetType + " is not supported in 
the Cast transformation.");
-            }
+            return switch (targetType) {
+                case INT8 -> castToInt8(value);
+                case INT16 -> castToInt16(value);
+                case INT32 -> castToInt32(value);
+                case INT64 -> castToInt64(value);
+                case FLOAT32 -> castToFloat32(value);
+                case FLOAT64 -> castToFloat64(value);
+                case BOOLEAN -> castToBoolean(value);
+                case STRING -> castToString(value);
+                default -> throw new DataException(targetType + " is not 
supported in the Cast transformation.");
+            };
         } catch (NumberFormatException e) {
             throw new DataException("Value (" + value.toString() + ") was out 
of range for requested data type", e);
         }
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
index 1773233394e..8b9030066ae 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
@@ -87,14 +87,11 @@ public abstract class HeaderFrom<R extends 
ConnectRecord<R>> implements Transfor
         }
 
         static Operation fromName(String name) {
-            switch (name) {
-                case MOVE_OPERATION:
-                    return MOVE;
-                case COPY_OPERATION:
-                    return COPY;
-                default:
-                    throw new IllegalArgumentException();
-            }
+            return switch (name) {
+                case MOVE_OPERATION -> MOVE;
+                case COPY_OPERATION -> COPY;
+                default -> throw new IllegalArgumentException();
+            };
         }
 
         public String toString() {
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
index c3a45d9170e..df76a923d81 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
@@ -65,32 +65,30 @@ public abstract class MaskField<R extends ConnectRecord<R>> 
implements Transform
 
     private static final String PURPOSE = "mask fields";
 
-    private static final Map<Class<?>, Function<String, ?>> 
REPLACEMENT_MAPPING_FUNC = new HashMap<>();
-    private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = new 
HashMap<>();
-
-    static {
-        PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE);
-        PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte) 0);
-        PRIMITIVE_VALUE_MAPPING.put(Short.class, (short) 0);
-        PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0);
-        PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L);
-        PRIMITIVE_VALUE_MAPPING.put(Float.class, 0f);
-        PRIMITIVE_VALUE_MAPPING.put(Double.class, 0d);
-        PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO);
-        PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
-        PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0));
-        PRIMITIVE_VALUE_MAPPING.put(String.class, "");
-
-        REPLACEMENT_MAPPING_FUNC.put(Byte.class, v -> 
Values.convertToByte(null, v));
-        REPLACEMENT_MAPPING_FUNC.put(Short.class, v -> 
Values.convertToShort(null, v));
-        REPLACEMENT_MAPPING_FUNC.put(Integer.class, v -> 
Values.convertToInteger(null, v));
-        REPLACEMENT_MAPPING_FUNC.put(Long.class, v -> 
Values.convertToLong(null, v));
-        REPLACEMENT_MAPPING_FUNC.put(Float.class, v -> 
Values.convertToFloat(null, v));
-        REPLACEMENT_MAPPING_FUNC.put(Double.class, v -> 
Values.convertToDouble(null, v));
-        REPLACEMENT_MAPPING_FUNC.put(String.class, Function.identity());
-        REPLACEMENT_MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new);
-        REPLACEMENT_MAPPING_FUNC.put(BigInteger.class, BigInteger::new);
-    }
+    private static final Map<Class<?>, Function<String, ?>> 
REPLACEMENT_MAPPING_FUNC = Map.of(
+        Byte.class, v -> Values.convertToByte(null, v),
+        Short.class, v -> Values.convertToShort(null, v),
+        Integer.class, v -> Values.convertToInteger(null, v),
+        Long.class, v -> Values.convertToLong(null, v),
+        Float.class, v -> Values.convertToFloat(null, v),
+        Double.class, v -> Values.convertToDouble(null, v),
+        String.class, Function.identity(),
+        BigDecimal.class, BigDecimal::new,
+        BigInteger.class, BigInteger::new
+    );
+    private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = 
Map.ofEntries(
+        Map.entry(Boolean.class, Boolean.FALSE),
+        Map.entry(Byte.class, (byte) 0),
+        Map.entry(Short.class, (short) 0),
+        Map.entry(Integer.class, 0),
+        Map.entry(Long.class, 0L),
+        Map.entry(Float.class, 0f),
+        Map.entry(Double.class, 0d),
+        Map.entry(BigInteger.class, BigInteger.ZERO),
+        Map.entry(BigDecimal.class, BigDecimal.ZERO),
+        Map.entry(Date.class, new Date(0)),
+        Map.entry(String.class, "")
+    );
 
     private Set<String> maskedFields;
     private String replacement;
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index 38d27e8a818..9584ecfc978 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -31,7 +31,6 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.transforms.util.SchemaUtil;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -56,11 +55,11 @@ public abstract class ReplaceField<R extends 
ConnectRecord<R>> implements Transf
     }
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(ConfigName.EXCLUDE, ConfigDef.Type.LIST, 
Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+            .define(ConfigName.EXCLUDE, ConfigDef.Type.LIST, List.of(), 
ConfigDef.Importance.MEDIUM,
                     "Fields to exclude. This takes precedence over the fields 
to include.")
-            .define(ConfigName.INCLUDE, ConfigDef.Type.LIST, 
Collections.emptyList(), ConfigDef.Importance.MEDIUM,
+            .define(ConfigName.INCLUDE, ConfigDef.Type.LIST, List.of(), 
ConfigDef.Importance.MEDIUM,
                     "Fields to include. If specified, only these fields will 
be used.")
-            .define(ConfigName.RENAMES, ConfigDef.Type.LIST, 
Collections.emptyList(),
+            .define(ConfigName.RENAMES, ConfigDef.Type.LIST, List.of(),
                 ConfigDef.LambdaValidator.with(
                     (name, value) -> {
                         @SuppressWarnings("unchecked")
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index aeec9ea4189..940bb6045a9 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -166,17 +166,15 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
             public Date toRaw(Config config, Object orig) {
                 if (!(orig instanceof Long unixTime))
                     throw new DataException("Expected Unix timestamp to be a 
Long, but found " + orig.getClass());
-                switch (config.unixPrecision) {
-                    case UNIX_PRECISION_SECONDS:
-                        return Timestamp.toLogical(Timestamp.SCHEMA, 
TimeUnit.SECONDS.toMillis(unixTime));
-                    case UNIX_PRECISION_MICROS:
-                        return Timestamp.toLogical(Timestamp.SCHEMA, 
TimeUnit.MICROSECONDS.toMillis(unixTime));
-                    case UNIX_PRECISION_NANOS:
-                        return Timestamp.toLogical(Timestamp.SCHEMA, 
TimeUnit.NANOSECONDS.toMillis(unixTime));
-                    case UNIX_PRECISION_MILLIS:
-                    default:
-                        return Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
-                }
+                return switch (config.unixPrecision) {
+                    case UNIX_PRECISION_SECONDS ->
+                        Timestamp.toLogical(Timestamp.SCHEMA, 
TimeUnit.SECONDS.toMillis(unixTime));
+                    case UNIX_PRECISION_MICROS ->
+                        Timestamp.toLogical(Timestamp.SCHEMA, 
TimeUnit.MICROSECONDS.toMillis(unixTime));
+                    case UNIX_PRECISION_NANOS ->
+                        Timestamp.toLogical(Timestamp.SCHEMA, 
TimeUnit.NANOSECONDS.toMillis(unixTime));
+                    default -> Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
+                };
             }
 
             @Override
@@ -186,18 +184,13 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
 
             @Override
             public Long toType(Config config, Date orig) {
-                Long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, 
orig);
-                switch (config.unixPrecision) {
-                    case UNIX_PRECISION_SECONDS:
-                        return TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
-                    case UNIX_PRECISION_MICROS:
-                        return TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
-                    case UNIX_PRECISION_NANOS:
-                        return TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
-                    case UNIX_PRECISION_MILLIS:
-                    default:
-                        return unixTimeMillis;
-                }
+                long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, 
orig);
+                return switch (config.unixPrecision) {
+                    case UNIX_PRECISION_SECONDS -> 
TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
+                    case UNIX_PRECISION_MICROS -> 
TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
+                    case UNIX_PRECISION_NANOS -> 
TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
+                    default -> unixTimeMillis;
+                };
             }
         });
 
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java
index 326a844025d..6016707d367 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java
@@ -22,7 +22,6 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -53,7 +52,7 @@ public class SingleFieldPath {
         this.version = version;
         switch (version) {
             case V1: // backward compatibility
-                this.steps = Collections.singletonList(pathText);
+                this.steps = List.of(pathText);
                 break;
             case V2:
                 this.steps = buildFieldPathV2(pathText);
@@ -134,7 +133,7 @@ public class SingleFieldPath {
         // add last step if last char is a dot
         if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
             steps.add("");
-        return Collections.unmodifiableList(steps);
+        return List.copyOf(steps);
     }
 
     private static void failWhenIncompleteBacktickPair(String path, int 
backtickAt) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index e79e163b463..1a470095a41 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -39,7 +39,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -72,38 +71,38 @@ public class CastTest {
 
     @Test
     public void testConfigEmpty() {
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "")));
     }
 
     @Test
     public void testConfigInvalidSchemaType() {
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:faketype")));
     }
 
     @Test
     public void testConfigInvalidTargetType() {
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array")));
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "array")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:array")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "array")));
     }
 
     @Test
     public void testUnsupportedTargetType() {
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:bytes")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:bytes")));
     }
 
     @Test
     public void testConfigInvalidMap() {
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8:extra")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int8:extra")));
     }
 
     @Test
     public void testConfigMixWholeAndFieldTransformation() {
-        assertThrows(ConfigException.class, () -> 
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8,int32")));
+        assertThrows(ConfigException.class, () -> 
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int8,int32")));
     }
 
     @Test
     public void castNullValueRecordWithSchema() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int64"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
         SourceRecord original = new SourceRecord(null, null, "topic", 0,
             Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null);
         SourceRecord transformed = xformValue.apply(original);
@@ -129,7 +128,7 @@ public class CastTest {
 
     @Test
     public void castNullValueRecordSchemaless() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int64"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
         SourceRecord original = new SourceRecord(null, null, "topic", 0,
             Schema.STRING_SCHEMA, "key", null, null);
         SourceRecord transformed = xformValue.apply(original);
@@ -138,7 +137,7 @@ public class CastTest {
 
     @Test
     public void castNullKeyRecordWithSchema() {
-        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int64"));
+        xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
         SourceRecord original = new SourceRecord(null, null, "topic", 0,
             Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value");
         SourceRecord transformed = xformKey.apply(original);
@@ -147,7 +146,7 @@ public class CastTest {
 
     @Test
     public void castNullKeyRecordSchemaless() {
-        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int64"));
+        xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
         SourceRecord original = new SourceRecord(null, null, "topic", 0,
             null, null, Schema.STRING_SCHEMA, "value");
         SourceRecord transformed = xformKey.apply(original);
@@ -156,7 +155,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordKeyWithSchema() {
-        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        xformKey.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
         SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, 
"topic", 0,
                 Schema.INT32_SCHEMA, 42, Schema.STRING_SCHEMA, "bogus"));
 
@@ -166,7 +165,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt8() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -176,7 +175,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt16() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int16"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int16"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -186,7 +185,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt32() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int32"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -196,7 +195,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt64() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int64"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int64"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -206,7 +205,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaFloat32() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float32"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float32"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -216,7 +215,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaFloat64() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float64"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float64"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -226,7 +225,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaBooleanTrue() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -236,7 +235,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaBooleanFalse() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 0));
 
@@ -246,7 +245,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaString() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
@@ -257,7 +256,7 @@ public class CastTest {
     @Test
     public void castWholeBigDecimalRecordValueWithSchemaString() {
         BigDecimal bigDecimal = new BigDecimal(42);
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Decimal.schema(bigDecimal.scale()), bigDecimal));
 
@@ -268,7 +267,7 @@ public class CastTest {
     @Test
     public void castWholeDateRecordValueWithSchemaString() {
         Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a 
timestamp formatting.
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Timestamp.SCHEMA, timestamp));
 
@@ -279,7 +278,7 @@ public class CastTest {
     @Test
     public void castWholeRecordDefaultValue() {
         // Validate default value in schema is correctly converted
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int32"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 SchemaBuilder.float32().defaultValue(-42.125f).build(), 
42.125f));
 
@@ -290,7 +289,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordKeySchemaless() {
-        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        xformKey.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
         SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, 
"topic", 0,
                 null, 42, Schema.STRING_SCHEMA, "bogus"));
 
@@ -300,7 +299,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt8() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -310,7 +309,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt16() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int16"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int16"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -320,7 +319,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt32() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int32"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -330,7 +329,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt64() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int64"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int64"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -340,7 +339,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessFloat32() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float32"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float32"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -350,7 +349,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessFloat64() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float64"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float64"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -360,7 +359,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessBooleanTrue() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -370,7 +369,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessBooleanFalse() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 0));
 
@@ -380,7 +379,7 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessString() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
@@ -390,15 +389,15 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessUnsupportedType() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
         assertThrows(DataException.class,
             () -> xformValue.apply(new SourceRecord(null, null, "topic", 0,
-                    null, Collections.singletonList("foo"))));
+                    null, List.of("foo"))));
     }
 
     @Test
     public void castLogicalToPrimitive() {
-        List<String> specParts = Arrays.asList(
+        List<String> specParts = List.of(
             "date_to_int32:int32",  // Cast to underlying representation
             "timestamp_to_int64:int64",  // Cast to underlying representation
             "time_to_int64:int64",  // Cast to wider datatype than underlying 
representation
@@ -408,7 +407,7 @@ public class CastTest {
         );
 
         Date day = new Date(MILLIS_PER_DAY);
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG,
             String.join(",", specParts)));
 
         SchemaBuilder builder = SchemaBuilder.struct();
@@ -455,7 +454,7 @@ public class CastTest {
         Date time = new Date(MILLIS_PER_HOUR);
         Date timestamp = new Date();
 
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG,
             "date:string,decimal:string,time:string,timestamp:string"));
 
         SchemaBuilder builder = SchemaBuilder.struct();
@@ -494,7 +493,7 @@ public class CastTest {
         byte[] byteArray = new byte[] {(byte) 0xFE, (byte) 0xDC, (byte) 0xBA, 
(byte) 0x98, 0x76, 0x54, 0x32, 0x10};
         ByteBuffer byteBuffer = ByteBuffer.wrap(Arrays.copyOf(byteArray, 
byteArray.length));
 
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG,
                 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32,bytes:string,byteArray:string"));
 
         // Include an optional fields and fields with defaults to validate 
their values are passed through properly
@@ -578,7 +577,7 @@ public class CastTest {
     @SuppressWarnings("unchecked")
     @Test
     public void castFieldsSchemaless() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
+        xformValue.configure(Map.of(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
         Map<String, Object> recordValue = new HashMap<>();
         recordValue.put("int8", (byte) 8);
         recordValue.put("int16", (short) 16);
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
index d164512897b..2def8f2e4d2 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
@@ -25,10 +25,9 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -38,7 +37,7 @@ public class DropHeadersTest {
 
     private Map<String, ?> config(String... headers) {
         Map<String, Object> result = new HashMap<>();
-        result.put(DropHeaders.HEADERS_FIELD, asList(headers));
+        result.put(DropHeaders.HEADERS_FIELD, List.of(headers));
         return result;
     }
 
@@ -106,8 +105,8 @@ public class DropHeadersTest {
     }
 
     private SourceRecord sourceRecord(ConnectHeaders headers) {
-        Map<String, ?> sourcePartition = singletonMap("foo", "bar");
-        Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
+        Map<String, ?> sourcePartition = Map.of("foo", "bar");
+        Map<String, ?> sourceOffset = Map.of("baz", "quxx");
         String topic = "topic";
         Integer partition = 0;
         Schema keySchema = null;
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index ff11ffe4e85..414dec56095 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -29,7 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -57,9 +56,9 @@ public class ExtractFieldTest {
 
     @Test
     public void schemaless() {
-        xformKey.configure(Collections.singletonMap("field", "magic"));
+        xformKey.configure(Map.of("field", "magic"));
 
-        final SinkRecord record = new SinkRecord("test", 0, null, 
Collections.singletonMap("magic", 42), null, null, 0);
+        final SinkRecord record = new SinkRecord("test", 0, null, 
Map.of("magic", 42), null, null, 0);
         final SinkRecord transformedRecord = xformKey.apply(record);
 
         assertNull(transformedRecord.keySchema());
@@ -73,7 +72,7 @@ public class ExtractFieldTest {
         configs.put("field", "magic.foo");
         xformKey.configure(configs);
 
-        final Map<String, Object> key = Collections.singletonMap("magic", 
Collections.singletonMap("foo", 42));
+        final Map<String, Object> key = Map.of("magic", Map.of("foo", 42));
         final SinkRecord record = new SinkRecord("test", 0, null, key, null, 
null, 0);
         final SinkRecord transformedRecord = xformKey.apply(record);
 
@@ -83,7 +82,7 @@ public class ExtractFieldTest {
 
     @Test
     public void nullSchemaless() {
-        xformKey.configure(Collections.singletonMap("field", "magic"));
+        xformKey.configure(Map.of("field", "magic"));
 
         final Map<String, Object> key = null;
         final SinkRecord record = new SinkRecord("test", 0, null, key, null, 
null, 0);
@@ -95,7 +94,7 @@ public class ExtractFieldTest {
 
     @Test
     public void withSchema() {
-        xformKey.configure(Collections.singletonMap("field", "magic"));
+        xformKey.configure(Map.of("field", "magic"));
 
         final Schema keySchema = SchemaBuilder.struct().field("magic", 
Schema.INT32_SCHEMA).build();
         final Struct key = new Struct(keySchema).put("magic", 42);
@@ -125,7 +124,7 @@ public class ExtractFieldTest {
 
     @Test
     public void testNullWithSchema() {
-        xformKey.configure(Collections.singletonMap("field", "magic"));
+        xformKey.configure(Map.of("field", "magic"));
 
         final Schema keySchema = SchemaBuilder.struct().field("magic", 
Schema.INT32_SCHEMA).optional().build();
         final Struct key = null;
@@ -138,9 +137,9 @@ public class ExtractFieldTest {
 
     @Test
     public void nonExistentFieldSchemalessShouldReturnNull() {
-        xformKey.configure(Collections.singletonMap("field", "nonexistent"));
+        xformKey.configure(Map.of("field", "nonexistent"));
 
-        final SinkRecord record = new SinkRecord("test", 0, null, 
Collections.singletonMap("magic", 42), null, null, 0);
+        final SinkRecord record = new SinkRecord("test", 0, null, 
Map.of("magic", 42), null, null, 0);
         final SinkRecord transformedRecord = xformKey.apply(record);
 
         assertNull(transformedRecord.keySchema());
@@ -154,7 +153,7 @@ public class ExtractFieldTest {
         configs.put("field", "magic.nonexistent");
         xformKey.configure(configs);
 
-        final Map<String, Object> key = Collections.singletonMap("magic", 
Collections.singletonMap("foo", 42));
+        final Map<String, Object> key = Map.of("magic", Map.of("foo", 42));
         final SinkRecord record = new SinkRecord("test", 0, null, key, null, 
null, 0);
         final SinkRecord transformedRecord = xformKey.apply(record);
 
@@ -164,7 +163,7 @@ public class ExtractFieldTest {
 
     @Test
     public void nonExistentFieldWithSchemaShouldFail() {
-        xformKey.configure(Collections.singletonMap("field", "nonexistent"));
+        xformKey.configure(Map.of("field", "nonexistent"));
 
         final Schema keySchema = SchemaBuilder.struct().field("magic", 
Schema.INT32_SCHEMA).build();
         final Struct key = new Struct(keySchema).put("magic", 42);
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index f771d4f0ac3..8873f4c03b0 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -27,10 +27,9 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -52,21 +51,21 @@ public class FlattenTest {
 
     @Test
     public void topLevelStructRequired() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
         assertThrows(DataException.class, () -> xformValue.apply(new 
SourceRecord(null, null,
                 "topic", 0, Schema.INT32_SCHEMA, 42)));
     }
 
     @Test
     public void topLevelMapRequired() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
         assertThrows(DataException.class, () -> xformValue.apply(new 
SourceRecord(null, null,
                 "topic", 0, null, 42)));
     }
 
     @Test
     public void testNestedStruct() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         SchemaBuilder builder = SchemaBuilder.struct();
         builder.field("int8", Schema.INT8_SCHEMA);
@@ -125,7 +124,7 @@ public class FlattenTest {
 
     @Test
     public void testNestedMapWithDelimiter() {
-        xformValue.configure(Collections.singletonMap("delimiter", "#"));
+        xformValue.configure(Map.of("delimiter", "#"));
 
         Map<String, Object> supportedTypes = new HashMap<>();
         supportedTypes.put("int8", (byte) 8);
@@ -138,8 +137,8 @@ public class FlattenTest {
         supportedTypes.put("string", "stringy");
         supportedTypes.put("bytes", "bytes".getBytes());
 
-        Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", 
supportedTypes);
-        Map<String, Object> twoLevelNestedMap = Collections.singletonMap("A", 
oneLevelNestedMap);
+        Map<String, Object> oneLevelNestedMap = Map.of("B", supportedTypes);
+        Map<String, Object> twoLevelNestedMap = Map.of("A", oneLevelNestedMap);
 
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
                 "topic", 0,
@@ -163,7 +162,7 @@ public class FlattenTest {
 
     @Test
     public void testOptionalFieldStruct() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         SchemaBuilder builder = SchemaBuilder.struct();
         builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
@@ -190,7 +189,7 @@ public class FlattenTest {
 
     @Test
     public void testOptionalStruct() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         SchemaBuilder builder = SchemaBuilder.struct().optional();
         builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
@@ -206,7 +205,7 @@ public class FlattenTest {
 
     @Test
     public void testOptionalNestedStruct() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         SchemaBuilder builder = SchemaBuilder.struct().optional();
         builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
@@ -230,12 +229,12 @@ public class FlattenTest {
 
     @Test
     public void testOptionalFieldMap() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         Map<String, Object> supportedTypes = new HashMap<>();
         supportedTypes.put("opt_int32", null);
 
-        Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", 
supportedTypes);
+        Map<String, Object> oneLevelNestedMap = Map.of("B", supportedTypes);
 
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
                 "topic", 0,
@@ -251,9 +250,9 @@ public class FlattenTest {
 
     @Test
     public void testKey() {
-        xformKey.configure(Collections.emptyMap());
+        xformKey.configure(Map.of());
 
-        Map<String, Map<String, Integer>> key = Collections.singletonMap("A", 
Collections.singletonMap("B", 12));
+        Map<String, Map<String, Integer>> key = Map.of("A", Map.of("B", 12));
         SourceRecord src = new SourceRecord(null, null, "topic", null, key, 
null, null);
         SourceRecord transformed = xformKey.apply(src);
 
@@ -266,14 +265,14 @@ public class FlattenTest {
 
     @Test
     public void testSchemalessArray() {
-        xformValue.configure(Collections.emptyMap());
-        Object value = Collections.singletonMap("foo", Arrays.asList("bar", 
Collections.singletonMap("baz", Collections.singletonMap("lfg", "lfg"))));
+        xformValue.configure(Map.of());
+        Object value = Map.of("foo", List.of("bar", Map.of("baz", 
Map.of("lfg", "lfg"))));
         assertEquals(value, xformValue.apply(new SourceRecord(null, null, 
"topic", null, null, null, value)).value());
     }
 
     @Test
     public void testArrayWithSchema() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
         Schema nestedStructSchema = SchemaBuilder.struct().field("lfg", 
Schema.STRING_SCHEMA).build();
         Schema innerStructSchema = SchemaBuilder.struct().field("baz", 
nestedStructSchema).build();
         Schema structSchema = SchemaBuilder.struct()
@@ -284,7 +283,7 @@ public class FlattenTest {
         Struct innerValue = new Struct(innerStructSchema);
         innerValue.put("baz", nestedValue);
         Struct value = new Struct(structSchema);
-        value.put("foo", Collections.singletonList(innerValue));
+        value.put("foo", List.of(innerValue));
         SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", null, null, structSchema, value)); 
         assertEquals(value, transformed.value());
         assertEquals(structSchema, transformed.valueSchema());
@@ -296,7 +295,7 @@ public class FlattenTest {
         // children should also be optional. Similarly, if the parent Struct 
has a default value, the default value for
         // the flattened field
 
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         SchemaBuilder builder = SchemaBuilder.struct().optional();
         builder.field("req_field", Schema.STRING_SCHEMA);
@@ -325,7 +324,7 @@ public class FlattenTest {
 
     @Test
     public void tombstoneEventWithoutSchemaShouldPassThrough() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
                 null, null);
@@ -337,7 +336,7 @@ public class FlattenTest {
 
     @Test
     public void tombstoneEventWithSchemaShouldPassThrough() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         final Schema simpleStructSchema = 
SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", 
Schema.OPTIONAL_INT64_SCHEMA).build();
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
@@ -350,7 +349,7 @@ public class FlattenTest {
 
     @Test
     public void testMapWithNullFields() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         // Use a LinkedHashMap to ensure the SMT sees entries in a specific 
order
         Map<String, Object> value = new LinkedHashMap<>();
@@ -368,7 +367,7 @@ public class FlattenTest {
 
     @Test
     public void testStructWithNullFields() {
-        xformValue.configure(Collections.emptyMap());
+        xformValue.configure(Map.of());
 
         final Schema structSchema = SchemaBuilder.struct()
             .field("firstNull", Schema.OPTIONAL_STRING_SCHEMA)
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
index da9e3584325..f68d7493a75 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
@@ -36,10 +36,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.Collections.singletonMap;
 import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -103,8 +99,8 @@ public class HeaderFromTest {
         }
 
         private SourceRecord sourceRecord(boolean keyTransform, Schema 
keyOrValueSchema, Object keyOrValue) {
-            Map<String, ?> sourcePartition = singletonMap("foo", "bar");
-            Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
+            Map<String, ?> sourcePartition = Map.of("foo", "bar");
+            Map<String, ?> sourceOffset = Map.of("baz", "quxx");
             String topic = "topic";
             Integer partition = 0;
             Long timestamp = 0L;
@@ -140,7 +136,7 @@ public class HeaderFromTest {
 
         List<Arguments> result = new ArrayList<>();
 
-        for (Boolean testKeyTransform : asList(true, false)) {
+        for (Boolean testKeyTransform : List.of(true, false)) {
             result.add(
                 Arguments.of(
                     "basic copy",
@@ -149,7 +145,7 @@ public class HeaderFromTest {
                         .withField("field1", STRING_SCHEMA, "field1-value")
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("header1", STRING_SCHEMA, "existing-value"),
-                    singletonList("field1"), singletonList("inserted1"), 
HeaderFrom.Operation.COPY, true,
+                    List.of("field1"), List.of("inserted1"), 
HeaderFrom.Operation.COPY, true,
                     new RecordBuilder()
                         .withField("field1", STRING_SCHEMA, "field1-value")
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -164,7 +160,7 @@ public class HeaderFromTest {
                         .withField("field1", STRING_SCHEMA, "field1-value")
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("header1", STRING_SCHEMA, "existing-value"),
-                    singletonList("field1"), singletonList("inserted1"), 
HeaderFrom.Operation.MOVE, true,
+                    List.of("field1"), List.of("inserted1"), 
HeaderFrom.Operation.MOVE, true,
                     new RecordBuilder()
                         // field1 got moved
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -179,7 +175,7 @@ public class HeaderFromTest {
                         .withField("field1", STRING_SCHEMA, "field1-value")
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("inserted1", STRING_SCHEMA, 
"existing-value"),
-                    singletonList("field1"), singletonList("inserted1"), 
HeaderFrom.Operation.COPY, true,
+                    List.of("field1"), List.of("inserted1"), 
HeaderFrom.Operation.COPY, true,
                     new RecordBuilder()
                         .withField("field1", STRING_SCHEMA, "field1-value")
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -194,7 +190,7 @@ public class HeaderFromTest {
                         .withField("field1", STRING_SCHEMA, "field1-value")
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("inserted1", STRING_SCHEMA, 
"existing-value"),
-                    singletonList("field1"), singletonList("inserted1"), 
HeaderFrom.Operation.MOVE, true,
+                    List.of("field1"), List.of("inserted1"), 
HeaderFrom.Operation.MOVE, true,
                     new RecordBuilder()
                         // field1 got moved
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -211,7 +207,7 @@ public class HeaderFromTest {
                         .withField("field1", schema, struct)
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("header1", STRING_SCHEMA, "existing-value"),
-                    singletonList("field1"), singletonList("inserted1"), 
HeaderFrom.Operation.COPY, true,
+                    List.of("field1"), List.of("inserted1"), 
HeaderFrom.Operation.COPY, true,
                     new RecordBuilder()
                         .withField("field1", schema, struct)
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -226,7 +222,7 @@ public class HeaderFromTest {
                         .withField("field1", schema, struct)
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("header1", STRING_SCHEMA, "existing-value"),
-                    singletonList("field1"), singletonList("inserted1"), 
HeaderFrom.Operation.MOVE, true,
+                    List.of("field1"), List.of("inserted1"), 
HeaderFrom.Operation.MOVE, true,
                     new RecordBuilder()
                         // field1 got moved
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -242,7 +238,7 @@ public class HeaderFromTest {
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("header1", STRING_SCHEMA, "existing-value"),
                     // two headers from the same field
-                    asList("field1", "field1"), asList("inserted1", 
"inserted2"), HeaderFrom.Operation.MOVE, true,
+                    List.of("field1", "field1"), List.of("inserted1", 
"inserted2"), HeaderFrom.Operation.MOVE, true,
                     new RecordBuilder()
                         // field1 got moved
                         .withField("field2", STRING_SCHEMA, "field2-value")
@@ -259,7 +255,7 @@ public class HeaderFromTest {
                         .withField("field2", STRING_SCHEMA, "field2-value")
                         .addHeader("header1", STRING_SCHEMA, "existing-value"),
                     // two headers from the same field
-                    asList("field1", "field2"), asList("inserted1", 
"inserted1"), HeaderFrom.Operation.MOVE, true,
+                    List.of("field1", "field2"), List.of("inserted1", 
"inserted1"), HeaderFrom.Operation.MOVE, true,
                     new RecordBuilder()
                         // field1 and field2 got moved
                         .addHeader("header1", STRING_SCHEMA, "existing-value")
@@ -274,7 +270,7 @@ public class HeaderFromTest {
                                     .withField("field1", 
SchemaBuilder.string().defaultValue("default").optional().build(), 
"field1-value")
                                     .withField("field2", 
SchemaBuilder.string().defaultValue("default").optional().build(), null)
                                     .addHeader("header1", STRING_SCHEMA, 
"existing-value"),
-                            asList("field1", "field2"), asList("inserted1", 
"inserted2"), HeaderFrom.Operation.COPY, false,
+                            List.of("field1", "field2"), List.of("inserted1", 
"inserted2"), HeaderFrom.Operation.COPY, false,
                             new RecordBuilder()
                                     .withField("field1", 
SchemaBuilder.string().defaultValue("default").optional().build(), 
"field1-value")
                                     .withField("field2", 
SchemaBuilder.string().defaultValue("default").optional().build(), null)
@@ -290,7 +286,7 @@ public class HeaderFromTest {
                                     .withField("field1", 
SchemaBuilder.string().defaultValue("default").optional().build(), 
"field1-value")
                                     .withField("field2", 
SchemaBuilder.string().defaultValue("default").optional().build(), null)
                                     .addHeader("header1", STRING_SCHEMA, 
"existing-value"),
-                            asList("field1", "field2"), asList("inserted1", 
"inserted2"), HeaderFrom.Operation.MOVE, false,
+                            List.of("field1", "field2"), List.of("inserted1", 
"inserted2"), HeaderFrom.Operation.MOVE, false,
                             new RecordBuilder()
                                     .addHeader("header1", STRING_SCHEMA, 
"existing-value")
                                     .addHeader("inserted1", 
SchemaBuilder.string().defaultValue("default").optional().build(), 
"field1-value")
@@ -353,7 +349,7 @@ public class HeaderFromTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void invalidConfigExtraHeaderConfig(boolean keyTransform) {
-        Map<String, Object> config = config(singletonList("foo"), 
asList("foo", "bar"), HeaderFrom.Operation.COPY, true);
+        Map<String, Object> config = config(List.of("foo"), List.of("foo", 
"bar"), HeaderFrom.Operation.COPY, true);
         HeaderFrom<?> xform = keyTransform ? new HeaderFrom.Key<>() : new 
HeaderFrom.Value<>();
         assertThrows(ConfigException.class, () -> xform.configure(config));
     }
@@ -361,7 +357,7 @@ public class HeaderFromTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void invalidConfigExtraFieldConfig(boolean keyTransform) {
-        Map<String, Object> config = config(asList("foo", "bar"), 
singletonList("foo"), HeaderFrom.Operation.COPY, true);
+        Map<String, Object> config = config(List.of("foo", "bar"), 
List.of("foo"), HeaderFrom.Operation.COPY, true);
         HeaderFrom<?> xform = keyTransform ? new HeaderFrom.Key<>() : new 
HeaderFrom.Value<>();
         assertThrows(ConfigException.class, () -> xform.configure(config));
     }
@@ -369,7 +365,7 @@ public class HeaderFromTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void invalidConfigEmptyHeadersAndFieldsConfig(boolean keyTransform) 
{
-        Map<String, Object> config = config(emptyList(), emptyList(), 
HeaderFrom.Operation.COPY, true);
+        Map<String, Object> config = config(List.of(), List.of(), 
HeaderFrom.Operation.COPY, true);
         HeaderFrom<?> xform = keyTransform ? new HeaderFrom.Key<>() : new 
HeaderFrom.Value<>();
         assertThrows(ConfigException.class, () -> xform.configure(config));
     }
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
index b72dddcdd15..93b69d5413d 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,18 +40,18 @@ public class HoistFieldTest {
 
     @Test
     public void schemaless() {
-        xform.configure(Collections.singletonMap("field", "magic"));
+        xform.configure(Map.of("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, null, 42, null, 
null, 0);
         final SinkRecord transformedRecord = xform.apply(record);
 
         assertNull(transformedRecord.keySchema());
-        assertEquals(Collections.singletonMap("magic", 42), 
transformedRecord.key());
+        assertEquals(Map.of("magic", 42), transformedRecord.key());
     }
 
     @Test
     public void withSchema() {
-        xform.configure(Collections.singletonMap("field", "magic"));
+        xform.configure(Map.of("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, 
Schema.INT32_SCHEMA, 42, null, null, 0);
         final SinkRecord transformedRecord = xform.apply(record);
@@ -64,7 +63,7 @@ public class HoistFieldTest {
 
     @Test
     public void testSchemalessMapIsMutable() {
-        xform.configure(Collections.singletonMap("field", "magic"));
+        xform.configure(Map.of("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, null, 420, null, 
null, 0);
         final SinkRecord transformedRecord = xform.apply(record);
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index cb48fdd810f..705f60f5a2e 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -30,7 +30,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -59,7 +58,7 @@ public class InsertFieldTest {
 
     @Test
     public void topLevelStructRequired() {
-        xformValue.configure(Collections.singletonMap("topic.field", 
"topic_field"));
+        xformValue.configure(Map.of("topic.field", "topic_field"));
         assertThrows(DataException.class,
             () -> xformValue.apply(new SourceRecord(null, null, "", 0, 
Schema.INT32_SCHEMA, 42)));
     }
@@ -118,7 +117,7 @@ public class InsertFieldTest {
         xformValue.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
-                null, null, null, Collections.singletonMap("magic", 42L), 
123L);
+                null, null, null, Map.of("magic", 42L), 123L);
 
         final SourceRecord transformedRecord = xformValue.apply(record);
 
@@ -183,7 +182,7 @@ public class InsertFieldTest {
         xformKey.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
-            null, Collections.singletonMap("magic", 42L), null, null);
+            null, Map.of("magic", 42L), null, null);
 
         final SourceRecord transformedRecord = xformKey.apply(record);
 
@@ -207,7 +206,7 @@ public class InsertFieldTest {
         xformKey.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,
-            null, null, null, Collections.singletonMap("magic", 42L));
+            null, null, null, Map.of("magic", 42L));
 
         final SourceRecord transformedRecord = xformKey.apply(record);
 
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
index 20c5b67a50a..190931b829b 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -105,8 +104,8 @@ public class InsertHeaderTest {
     }
 
     private SourceRecord sourceRecord(ConnectHeaders headers) {
-        Map<String, ?> sourcePartition = singletonMap("foo", "bar");
-        Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
+        Map<String, ?> sourcePartition = Map.of("foo", "bar");
+        Map<String, ?> sourceOffset = Map.of("baz", "quxx");
         String topic = "topic";
         Integer partition = 0;
         Schema keySchema = null;
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
index 05989af572f..3bdc1cd3b4c 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
@@ -33,73 +33,69 @@ import org.junit.jupiter.api.Test;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static java.util.Collections.singletonList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class MaskFieldTest {
 
     private static final Schema SCHEMA = SchemaBuilder.struct()
-            .field("magic", Schema.INT32_SCHEMA)
-            .field("bool", Schema.BOOLEAN_SCHEMA)
-            .field("byte", Schema.INT8_SCHEMA)
-            .field("short", Schema.INT16_SCHEMA)
-            .field("int", Schema.INT32_SCHEMA)
-            .field("long", Schema.INT64_SCHEMA)
-            .field("float", Schema.FLOAT32_SCHEMA)
-            .field("double", Schema.FLOAT64_SCHEMA)
-            .field("string", Schema.STRING_SCHEMA)
-            .field("date", org.apache.kafka.connect.data.Date.SCHEMA)
-            .field("time", Time.SCHEMA)
-            .field("timestamp", Timestamp.SCHEMA)
-            .field("decimal", Decimal.schema(0))
-            .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA))
-            .field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA))
-            .field("withDefault", 
SchemaBuilder.string().optional().defaultValue("default").build())
-            .build();
-    private static final Map<String, Object> VALUES = new HashMap<>();
-    private static final Struct VALUES_WITH_SCHEMA = new Struct(SCHEMA);
+        .field("magic", Schema.INT32_SCHEMA)
+        .field("bool", Schema.BOOLEAN_SCHEMA)
+        .field("byte", Schema.INT8_SCHEMA)
+        .field("short", Schema.INT16_SCHEMA)
+        .field("int", Schema.INT32_SCHEMA)
+        .field("long", Schema.INT64_SCHEMA)
+        .field("float", Schema.FLOAT32_SCHEMA)
+        .field("double", Schema.FLOAT64_SCHEMA)
+        .field("string", Schema.STRING_SCHEMA)
+        .field("date", org.apache.kafka.connect.data.Date.SCHEMA)
+        .field("time", Time.SCHEMA)
+        .field("timestamp", Timestamp.SCHEMA)
+        .field("decimal", Decimal.schema(0))
+        .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA))
+        .field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA))
+        .field("withDefault", 
SchemaBuilder.string().optional().defaultValue("default").build())
+        .build();
 
-    static {
-        VALUES.put("magic", 42);
-        VALUES.put("bool", true);
-        VALUES.put("byte", (byte) 42);
-        VALUES.put("short", (short) 42);
-        VALUES.put("int", 42);
-        VALUES.put("long", 42L);
-        VALUES.put("float", 42f);
-        VALUES.put("double", 42d);
-        VALUES.put("string", "55.121.20.20");
-        VALUES.put("date", new Date());
-        VALUES.put("bigint", new BigInteger("42"));
-        VALUES.put("bigdec", new BigDecimal("42.0"));
-        VALUES.put("list", singletonList(42));
-        VALUES.put("map", Collections.singletonMap("key", "value"));
+    private static final Map<String, Object> VALUES = Map.ofEntries(
+        Map.entry("magic", 42),
+        Map.entry("bool", true),
+        Map.entry("byte", (byte) 42),
+        Map.entry("short", (short) 42),
+        Map.entry("int", 42),
+        Map.entry("long", 42L),
+        Map.entry("float", 42f),
+        Map.entry("double", 42d),
+        Map.entry("string", "55.121.20.20"),
+        Map.entry("date", new Date()),
+        Map.entry("bigint", new BigInteger("42")),
+        Map.entry("bigdec", new BigDecimal("42.0")),
+        Map.entry("list", List.of(42)),
+        Map.entry("map", Map.of("key", "value"))
+    );
 
-        VALUES_WITH_SCHEMA.put("magic", 42);
-        VALUES_WITH_SCHEMA.put("bool", true);
-        VALUES_WITH_SCHEMA.put("byte", (byte) 42);
-        VALUES_WITH_SCHEMA.put("short", (short) 42);
-        VALUES_WITH_SCHEMA.put("int", 42);
-        VALUES_WITH_SCHEMA.put("long", 42L);
-        VALUES_WITH_SCHEMA.put("float", 42f);
-        VALUES_WITH_SCHEMA.put("double", 42d);
-        VALUES_WITH_SCHEMA.put("string", "hmm");
-        VALUES_WITH_SCHEMA.put("date", new Date());
-        VALUES_WITH_SCHEMA.put("time", new Date());
-        VALUES_WITH_SCHEMA.put("timestamp", new Date());
-        VALUES_WITH_SCHEMA.put("decimal", new BigDecimal(42));
-        VALUES_WITH_SCHEMA.put("array", Arrays.asList(1, 2, 3));
-        VALUES_WITH_SCHEMA.put("map", Collections.singletonMap("what", 
"what"));
-        VALUES_WITH_SCHEMA.put("withDefault", null);
-    }
+    private static final Struct VALUES_WITH_SCHEMA = new Struct(SCHEMA)
+        .put("magic", 42)
+        .put("bool", true)
+        .put("byte", (byte) 42)
+        .put("short", (short) 42)
+        .put("int", 42)
+        .put("long", 42L)
+        .put("float", 42f)
+        .put("double", 42d)
+        .put("string", "hmm")
+        .put("date", new Date())
+        .put("time", new Date())
+        .put("timestamp", new Date())
+        .put("decimal", new BigDecimal(42))
+        .put("array", List.of(1, 2, 3))
+        .put("map", Map.of("what", "what"))
+        .put("withDefault", null);
 
     private static MaskField<SinkRecord> transform(List<String> fields, String 
replacement) {
         final MaskField<SinkRecord> xform = new MaskField.Value<>();
@@ -117,20 +113,20 @@ public class MaskFieldTest {
 
     private static void checkReplacementWithSchema(String maskField, Object 
replacement) {
         SinkRecord record = record(SCHEMA, VALUES_WITH_SCHEMA);
-        final Struct updatedValue = (Struct) 
transform(singletonList(maskField), 
String.valueOf(replacement)).apply(record).value();
+        final Struct updatedValue = (Struct) transform(List.of(maskField), 
String.valueOf(replacement)).apply(record).value();
         assertEquals(replacement, updatedValue.get(maskField), "Invalid 
replacement for " + maskField + " value");
     }
 
     private static void checkReplacementSchemaless(String maskField, Object 
replacement) {
-        checkReplacementSchemaless(singletonList(maskField), replacement);
+        checkReplacementSchemaless(List.of(maskField), replacement);
     }
 
     @SuppressWarnings("unchecked")
     private static void checkReplacementSchemaless(List<String> maskFields, 
Object replacement) {
         SinkRecord record = record(null, VALUES);
         final Map<String, Object> updatedValue = (Map) transform(maskFields, 
String.valueOf(replacement))
-                .apply(record)
-                .value();
+            .apply(record)
+            .value();
         for (String maskField : maskFields) {
             assertEquals(replacement, updatedValue.get(maskField), "Invalid 
replacement for " + maskField + " value");
         }
@@ -154,8 +150,8 @@ public class MaskFieldTest {
         assertEquals(new Date(0), updatedValue.get("date"));
         assertEquals(BigInteger.ZERO, updatedValue.get("bigint"));
         assertEquals(BigDecimal.ZERO, updatedValue.get("bigdec"));
-        assertEquals(Collections.emptyList(), updatedValue.get("list"));
-        assertEquals(Collections.emptyMap(), updatedValue.get("map"));
+        assertEquals(List.of(), updatedValue.get("list"));
+        assertEquals(Map.of(), updatedValue.get("map"));
     }
 
     @Test
@@ -182,8 +178,8 @@ public class MaskFieldTest {
         assertEquals(new Date(0), updatedValue.get("time"));
         assertEquals(new Date(0), updatedValue.get("timestamp"));
         assertEquals(BigDecimal.ZERO, updatedValue.get("decimal"));
-        assertEquals(Collections.emptyList(), updatedValue.get("array"));
-        assertEquals(Collections.emptyMap(), updatedValue.get("map"));
+        assertEquals(List.of(), updatedValue.get("array"));
+        assertEquals(Map.of(), updatedValue.get("map"));
         assertEquals(null, updatedValue.getWithoutDefault("withDefault"));
     }
 
@@ -206,10 +202,10 @@ public class MaskFieldTest {
         Class<DataException> exClass = DataException.class;
 
         assertThrows(exClass, () -> checkReplacementSchemaless("date", new 
Date()), exMessage);
-        assertThrows(exClass, () -> 
checkReplacementSchemaless(Arrays.asList("int", "date"), new Date()), 
exMessage);
+        assertThrows(exClass, () -> checkReplacementSchemaless(List.of("int", 
"date"), new Date()), exMessage);
         assertThrows(exClass, () -> checkReplacementSchemaless("bool", false), 
exMessage);
-        assertThrows(exClass, () -> checkReplacementSchemaless("list", 
singletonList("123")), exMessage);
-        assertThrows(exClass, () -> checkReplacementSchemaless("map", 
Collections.singletonMap("123", "321")), exMessage);
+        assertThrows(exClass, () -> checkReplacementSchemaless("list", 
List.of("123")), exMessage);
+        assertThrows(exClass, () -> checkReplacementSchemaless("map", 
Map.of("123", "321")), exMessage);
     }
 
     @Test
@@ -231,7 +227,7 @@ public class MaskFieldTest {
 
         assertThrows(exClass, () -> checkReplacementWithSchema("time", new 
Date()), exMessage);
         assertThrows(exClass, () -> checkReplacementWithSchema("timestamp", 
new Date()), exMessage);
-        assertThrows(exClass, () -> checkReplacementWithSchema("array", 
singletonList(123)), exMessage);
+        assertThrows(exClass, () -> checkReplacementWithSchema("array", 
List.of(123)), exMessage);
     }
 
     @Test
@@ -249,7 +245,7 @@ public class MaskFieldTest {
         assertThrows(exClass, () -> checkReplacementSchemaless("bigdec", 
"foo"), exMessage);
         assertThrows(exClass, () -> checkReplacementSchemaless("int", new 
Date()), exMessage);
         assertThrows(exClass, () -> checkReplacementSchemaless("int", new 
Object()), exMessage);
-        assertThrows(exClass, () -> 
checkReplacementSchemaless(Arrays.asList("string", "int"), "foo"), exMessage);
+        assertThrows(exClass, () -> 
checkReplacementSchemaless(List.of("string", "int"), "foo"), exMessage);
     }
 
     @Test
@@ -259,17 +255,17 @@ public class MaskFieldTest {
 
     @Test
     public void testNullListAndMapReplacementsAreMutable() {
-        final List<String> maskFields = Arrays.asList("array", "map");
+        final List<String> maskFields = List.of("array", "map");
         final Struct updatedValue = (Struct) transform(maskFields, 
null).apply(record(SCHEMA, VALUES_WITH_SCHEMA)).value();
         @SuppressWarnings("unchecked") List<Integer> actualList = 
(List<Integer>) updatedValue.get("array");
-        assertEquals(Collections.emptyList(), actualList);
+        assertEquals(List.of(), actualList);
         actualList.add(0);
-        assertEquals(Collections.singletonList(0), actualList);
+        assertEquals(List.of(0), actualList);
 
         @SuppressWarnings("unchecked") Map<String, String> actualMap = 
(Map<String, String>) updatedValue.get("map");
-        assertEquals(Collections.emptyMap(), actualMap);
+        assertEquals(Map.of(), actualMap);
         actualMap.put("k", "v");
-        assertEquals(Collections.singletonMap("k", "v"), actualMap);
+        assertEquals(Map.of("k", "v"), actualMap);
     }
 
     @Test
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index 5f0e51559bd..7f47dd0f8c0 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -30,7 +30,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -57,7 +56,7 @@ public class SetSchemaMetadataTest {
 
     @Test
     public void schemaNameUpdate() {
-        xform.configure(Collections.singletonMap("schema.name", "foo"));
+        xform.configure(Map.of("schema.name", "foo"));
         final SinkRecord record = new SinkRecord("", 0, null, null, 
SchemaBuilder.struct().build(), null, 0);
         final SinkRecord updatedRecord = xform.apply(record);
         assertEquals("foo", updatedRecord.valueSchema().name());
@@ -65,7 +64,7 @@ public class SetSchemaMetadataTest {
 
     @Test
     public void schemaVersionUpdate() {
-        xform.configure(Collections.singletonMap("schema.version", 42));
+        xform.configure(Map.of("schema.version", 42));
         final SinkRecord record = new SinkRecord("", 0, null, null, 
SchemaBuilder.struct().build(), null, 0);
         final SinkRecord updatedRecord = xform.apply(record);
         assertEquals(42, updatedRecord.valueSchema().version());
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index a807ad1fc21..d67d031482d 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -34,7 +34,6 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.GregorianCalendar;
 import java.util.HashMap;
 import java.util.Map;
@@ -108,13 +107,13 @@ public class TimestampConverterTest {
 
     @Test
     public void testConfigNoTargetType() {
-        assertThrows(ConfigException.class, () -> 
xformValue.configure(Collections.emptyMap()));
+        assertThrows(ConfigException.class, () -> 
xformValue.configure(Map.of()));
     }
 
     @Test
     public void testConfigInvalidTargetType() {
         assertThrows(ConfigException.class,
-            () -> 
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "invalid")));
+            () -> 
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "invalid")));
     }
 
     @Test
@@ -136,7 +135,7 @@ public class TimestampConverterTest {
     @Test
     public void testConfigMissingFormat() {
         assertThrows(ConfigException.class,
-            () -> 
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "string")));
+            () -> 
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "string")));
     }
 
     @Test
@@ -151,7 +150,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessIdentity() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -160,7 +159,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToDate() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Date"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Date"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -169,7 +168,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToTime() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Time"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Time"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -178,7 +177,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToUnix() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "unix"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"unix"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -202,7 +201,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessDateToTimestamp() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -212,7 +211,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimeToTimestamp() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(TIME.getTime()));
 
         assertNull(transformed.valueSchema());
@@ -222,7 +221,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessUnixToTimestamp() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_UNIX));
 
         assertNull(transformed.valueSchema());
@@ -246,7 +245,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaIdentity() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -255,7 +254,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToDate() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Date"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Date"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Date.SCHEMA, transformed.valueSchema());
@@ -264,7 +263,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToTime() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Time"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Time"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Time.SCHEMA, transformed.valueSchema());
@@ -273,7 +272,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToUnix() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "unix"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"unix"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
@@ -348,7 +347,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaDateToTimestamp() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Date.SCHEMA, DATE.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -358,7 +357,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimeToTimestamp() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Time.SCHEMA, TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -368,7 +367,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaUnixToTimestamp() {
-        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Schema.INT64_SCHEMA, 
DATE_PLUS_TIME_UNIX));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
@@ -530,11 +529,11 @@ public class TimestampConverterTest {
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
         xformValue.configure(config);
 
-        Object value = Collections.singletonMap("ts", 
DATE_PLUS_TIME.getTime());
+        Object value = Map.of("ts", DATE_PLUS_TIME.getTime());
         SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(value));
 
         assertNull(transformed.valueSchema());
-        assertEquals(Collections.singletonMap("ts", DATE.getTime()), 
transformed.value());
+        assertEquals(Map.of("ts", DATE.getTime()), transformed.value());
     }
 
     @Test
@@ -590,7 +589,7 @@ public class TimestampConverterTest {
                 .build();
 
         assertEquals(expectedSchema, transformed.valueSchema());
-        assertEquals(null, ((Struct) transformed.value()).get("ts"));
+        assertNull(((Struct) transformed.value()).get("ts"));
         assertEquals("test", ((Struct) transformed.value()).get("other"));
     }
 
@@ -716,7 +715,7 @@ public class TimestampConverterTest {
 
     @Test
     public void testKey() {
-        
xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        xformKey.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
         SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
 
         assertNull(transformed.keySchema());
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
index 43b3b1f384f..a98c4406ad9 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -32,7 +32,7 @@ public class TimestampRouterTest {
     @BeforeEach
     public void setup() {
         xform = new TimestampRouter<>();
-        xform.configure(Collections.emptyMap()); // defaults
+        xform.configure(Map.of()); // defaults
     }
 
     @AfterEach
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index df528cf518a..775bfbabac2 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -29,7 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -55,7 +54,7 @@ public class ValueToKeyTest {
 
     @Test
     public void schemaless() {
-        xform.configure(Collections.singletonMap("fields", "a,b"));
+        xform.configure(Map.of("fields", "a,b"));
 
         final HashMap<String, Integer> value = new HashMap<>();
         value.put("a", 1);
@@ -75,7 +74,7 @@ public class ValueToKeyTest {
 
     @Test
     public void withSchema() {
-        xform.configure(Collections.singletonMap("fields", "a,b"));
+        xform.configure(Map.of("fields", "a,b"));
 
         final Schema valueSchema = SchemaBuilder.struct()
                 .field("a", Schema.INT32_SCHEMA)
@@ -106,7 +105,7 @@ public class ValueToKeyTest {
 
     @Test
     public void nonExistingField() {
-        xform.configure(Collections.singletonMap("fields", "not_exist"));
+        xform.configure(Map.of("fields", "not_exist"));
 
         final Schema valueSchema = SchemaBuilder.struct()
             .field("a", Schema.INT32_SCHEMA)
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java
index d400141c95b..a0c2e2c4861 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java
@@ -35,18 +35,18 @@ public class FieldSyntaxVersionTest {
     @Test
     void shouldAppendConfigToDef() {
         ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
-        assertEquals(def.configKeys().size(), 1);
+        assertEquals(1, def.configKeys().size());
         final ConfigDef.ConfigKey configKey = 
def.configKeys().get("field.syntax.version");
-        assertEquals(configKey.name, "field.syntax.version");
-        assertEquals(configKey.defaultValue, "V1");
+        assertEquals("field.syntax.version", configKey.name);
+        assertEquals("V1", configKey.defaultValue);
     }
 
     @Test
     void shouldFailWhenAppendConfigToDefAgain() {
         ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
-        assertEquals(def.configKeys().size(), 1);
+        assertEquals(1, def.configKeys().size());
         ConfigException e = assertThrows(ConfigException.class, () -> 
FieldSyntaxVersion.appendConfigTo(def));
-        assertEquals(e.getMessage(), "Configuration field.syntax.version is 
defined twice.");
+        assertEquals("Configuration field.syntax.version is defined twice.", 
e.getMessage());
     }
 
     @ParameterizedTest
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
index e3e3920858d..f6b98aab8c4 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKeyTest.java
@@ -26,13 +26,11 @@ import 
org.apache.kafka.connect.transforms.util.SimpleConfig;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -58,16 +56,16 @@ public class HasHeaderKeyTest {
     @Test
     public void testConfig() {
         HasHeaderKey<SourceRecord> predicate = new HasHeaderKey<>();
-        predicate.config().validate(Collections.singletonMap("name", "foo"));
+        predicate.config().validate(Map.of("name", "foo"));
 
-        List<ConfigValue> configs = 
predicate.config().validate(Collections.singletonMap("name", ""));
-        assertEquals(singletonList("Invalid value  for configuration name: 
String must be non-empty"), configs.get(0).errorMessages());
+        List<ConfigValue> configs = predicate.config().validate(Map.of("name", 
""));
+        assertEquals(List.of("Invalid value  for configuration name: String 
must be non-empty"), configs.get(0).errorMessages());
     }
 
     @Test
     public void testTest() {
         HasHeaderKey<SourceRecord> predicate = new HasHeaderKey<>();
-        predicate.configure(Collections.singletonMap("name", "foo"));
+        predicate.configure(Map.of("name", "foo"));
 
         assertTrue(predicate.test(recordWithHeaders("foo")));
         assertTrue(predicate.test(recordWithHeaders("foo", "bar")));
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java
index 3d9ac4dba90..140d0d6c30f 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatchesTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,9 +60,9 @@ public class TopicNameMatchesTest {
     @Test
     public void testConfig() {
         TopicNameMatches<SourceRecord> predicate = new TopicNameMatches<>();
-        predicate.config().validate(Collections.singletonMap("pattern", 
"my-prefix-.*"));
+        predicate.config().validate(Map.of("pattern", "my-prefix-.*"));
 
-        List<ConfigValue> configs = 
predicate.config().validate(Collections.singletonMap("pattern", "*"));
+        List<ConfigValue> configs = 
predicate.config().validate(Map.of("pattern", "*"));
         List<String> errorMsgs = configs.get(0).errorMessages();
         assertEquals(1, errorMsgs.size());
         assertTrue(errorMsgs.get(0).contains("Invalid regex"));
@@ -72,7 +71,7 @@ public class TopicNameMatchesTest {
     @Test
     public void testTest() {
         TopicNameMatches<SourceRecord> predicate = new TopicNameMatches<>();
-        predicate.configure(Collections.singletonMap("pattern", 
"my-prefix-.*"));
+        predicate.configure(Map.of("pattern", "my-prefix-.*"));
 
         assertTrue(predicate.test(recordWithTopicName("my-prefix-")));
         assertTrue(predicate.test(recordWithTopicName("my-prefix-foo")));
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java
index 5060346a2d9..3a9ef48f8dd 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidatorTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.config.ConfigException;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -34,11 +34,11 @@ public class NonEmptyListValidatorTest {
     @Test
     public void testEmptyList() {
         assertThrows(ConfigException.class,
-            () -> new NonEmptyListValidator().ensureValid("foo", 
Collections.emptyList()));
+            () -> new NonEmptyListValidator().ensureValid("foo", List.of()));
     }
 
     @Test
     public void testValidList() {
-        new NonEmptyListValidator().ensureValid("foo", 
Collections.singletonList("foo"));
+        new NonEmptyListValidator().ensureValid("foo", List.of("foo"));
     }
 }

Reply via email to