This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b6f087  [FLINK-34260][Connectors/AWS] Update flink-connector-aws to 
be compatible with updated SinkV2 interfaces
5b6f087 is described below

commit 5b6f087815bcf18cf62ba39b2ac1f84f5e72f951
Author: Aleksandr Pilipenko <z3d...@gmail.com>
AuthorDate: Mon Jan 29 20:17:41 2024 +0000

    [FLINK-34260][Connectors/AWS] Update flink-connector-aws to be compatible 
with updated SinkV2 interfaces
---
 .../flink-connector-aws-kinesis-firehose/pom.xml   |  6 ++
 .../sink/KinesisFirehoseSinkWriterTest.java        | 14 ++---
 .../kinesis/sink/KinesisStreamsSinkWriterTest.java | 15 ++---
 .../flink-connector-dynamodb/pom.xml               |  6 ++
 .../connector/dynamodb/sink/DynamoDbSink.java      | 22 +++++++-
 .../dynamodb/sink/DynamoDbSinkWriterTest.java      | 64 ++++++++++++----------
 .../flink-connector-kinesis/pom.xml                |  6 ++
 .../kinesis/FlinkKinesisConsumerTest.java          |  2 +-
 .../GlueSchemaRegistryAvroSchemaCoderTest.java     |  5 +-
 ...eSchemaRegistryInputStreamDeserializerTest.java |  3 +-
 .../flink-json-glue-schema-registry/pom.xml        |  8 +++
 .../GlueSchemaRegistryJsonSchemaCoderProvider.java |  8 +--
 ...chemaRegistryJsonDeserializationSchemaTest.java |  5 +-
 pom.xml                                            | 12 +++-
 14 files changed, 116 insertions(+), 60 deletions(-)

diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml 
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
index fd2e7b3..68df44e 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml
@@ -71,6 +71,12 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 29160f7..2d9ddef 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -48,22 +48,22 @@ public class KinesisFirehoseSinkWriterTest {
                     .build();
 
     @BeforeEach
-    void setup() {
+    void setup() throws IOException {
         TestSinkInitContext sinkInitContext = new TestSinkInitContext();
         Properties sinkProperties = 
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint";);
-        sinkWriter =
-                new KinesisFirehoseSinkWriter<>(
+        KinesisFirehoseSink<String> sink =
+                new KinesisFirehoseSink<>(
                         ELEMENT_CONVERTER_PLACEHOLDER,
-                        sinkInitContext,
                         50,
                         16,
                         10000,
-                        4 * 1024 * 1024,
-                        5000,
-                        1000 * 1024,
+                        4 * 1024 * 1024L,
+                        5000L,
+                        1000 * 1024L,
                         true,
                         "streamName",
                         sinkProperties);
+        sinkWriter = (KinesisFirehoseSinkWriter<String>) 
sink.createWriter(sinkInitContext);
     }
 
     @Test
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
index eccfe0a..f3c13d4 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
@@ -28,6 +27,7 @@ import 
org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRat
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 
+import java.io.IOException;
 import java.util.Properties;
 
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -55,13 +55,13 @@ public class KinesisStreamsSinkWriterTest {
                             .build();
 
     @Test
-    void 
testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters()
 {
-        Sink.InitContext sinkInitContext = new TestSinkInitContext();
+    void 
testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters()
+            throws IOException {
+        TestSinkInitContext sinkInitContext = new TestSinkInitContext();
         Properties sinkProperties = 
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint";);
-        sinkWriter =
-                new KinesisStreamsSinkWriter<String>(
+        KinesisStreamsSink<String> sink =
+                new KinesisStreamsSink<>(
                         ELEMENT_CONVERTER_PLACEHOLDER,
-                        sinkInitContext,
                         MAX_BATCH_SIZE,
                         MAX_INFLIGHT_REQUESTS,
                         MAX_BUFFERED_REQUESTS,
@@ -70,8 +70,9 @@ public class KinesisStreamsSinkWriterTest {
                         MAX_RECORD_SIZE,
                         FAIL_ON_ERROR,
                         "streamName",
-                        "StreamARN",
+                        
"arn:aws:kinesis:us-east-1:000000000000:stream/streamName",
                         sinkProperties);
+        sinkWriter = (KinesisStreamsSinkWriter<String>) 
sink.createWriter(sinkInitContext);
 
         assertThat(sinkWriter)
                 .extracting("rateLimitingStrategy")
diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml 
b/flink-connector-aws/flink-connector-dynamodb/pom.xml
index c4bddcf..b4c6715 100644
--- a/flink-connector-aws/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml
@@ -87,6 +87,12 @@ under the License.
         </dependency>
 
         <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-base</artifactId>
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
index c90aa19..8f64a67 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
@@ -20,12 +20,16 @@ package org.apache.flink.connector.dynamodb.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.base.sink.AsyncSinkBase;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import 
org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider;
+import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -80,6 +84,7 @@ public class DynamoDbSink<InputT> extends 
AsyncSinkBase<InputT, DynamoDbWriteReq
     private final boolean failOnError;
     private final String tableName;
     private final List<String> overwriteByPartitionKeys;
+    private transient SdkClientProvider<DynamoDbAsyncClient> 
asyncClientSdkClientProviderOverride;
 
     protected DynamoDbSink(
             ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
@@ -152,7 +157,7 @@ public class DynamoDbSink<InputT> extends 
AsyncSinkBase<InputT, DynamoDbWriteReq
                 failOnError,
                 tableName,
                 overwriteByPartitionKeys,
-                new DynamoDbAsyncClientProvider(dynamoDbClientProperties),
+                getAsyncClientProvider(dynamoDbClientProperties),
                 recoveredState);
     }
 
@@ -162,4 +167,19 @@ public class DynamoDbSink<InputT> extends 
AsyncSinkBase<InputT, DynamoDbWriteReq
             getWriterStateSerializer() {
         return new DynamoDbWriterStateSerializer();
     }
+
+    private SdkClientProvider<DynamoDbAsyncClient> getAsyncClientProvider(
+            Properties clientProperties) {
+        if (asyncClientSdkClientProviderOverride != null) {
+            return asyncClientSdkClientProviderOverride;
+        }
+        return new DynamoDbAsyncClientProvider(clientProperties);
+    }
+
+    @Internal
+    @VisibleForTesting
+    void setDynamoDbAsyncClientProvider(
+            SdkClientProvider<DynamoDbAsyncClient> 
asyncClientSdkClientProviderOverride) {
+        this.asyncClientSdkClientProviderOverride = 
asyncClientSdkClientProviderOverride;
+    }
 }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
index d37e184..f0d139f 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.dynamodb.sink;
 
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
 
@@ -39,6 +38,7 @@ import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 import software.amazon.awssdk.services.sts.model.StsException;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -174,7 +175,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+    public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() throws 
IOException {
         Optional<Exception> exceptionToThrow = getGenericRetryableException();
         boolean failOnError = true;
 
@@ -234,7 +235,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+    public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() 
throws IOException {
         Optional<Exception> exceptionToThrow = 
getGenericNonRetryableException();
         boolean failOnError = true;
 
@@ -242,7 +243,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() {
+    public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() 
throws IOException {
         Optional<Exception> exceptionToThrow = 
getGenericNonRetryableException();
         boolean failOnError = false;
 
@@ -250,7 +251,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testInterruptedExceptionIsNonRetryable() {
+    public void testInterruptedExceptionIsNonRetryable() throws IOException {
         Optional<Exception> exceptionToThrow = Optional.of(new 
InterruptedException());
         boolean failOnError = false;
 
@@ -258,7 +259,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testInvalidCredentialsExceptionIsNonRetryable() {
+    public void testInvalidCredentialsExceptionIsNonRetryable() throws 
IOException {
         Optional<Exception> exceptionToThrow = 
Optional.of(StsException.builder().build());
         boolean failOnError = false;
 
@@ -266,7 +267,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testResourceNotFoundExceptionIsNonRetryable() {
+    public void testResourceNotFoundExceptionIsNonRetryable() throws 
IOException {
         Optional<Exception> exceptionToThrow =
                 Optional.of(ResourceNotFoundException.builder().build());
         boolean failOnError = false;
@@ -275,7 +276,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testConditionalCheckFailedExceptionIsNonRetryable() {
+    public void testConditionalCheckFailedExceptionIsNonRetryable() throws 
IOException {
         Optional<Exception> exceptionToThrow =
                 Optional.of(ConditionalCheckFailedException.builder().build());
         boolean failOnError = false;
@@ -284,7 +285,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testValidationExceptionIsNonRetryable() {
+    public void testValidationExceptionIsNonRetryable() throws IOException {
         Optional<Exception> exceptionToThrow =
                 Optional.of(
                         DynamoDbException.builder()
@@ -299,7 +300,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testSdkClientExceptionIsNonRetryable() {
+    public void testSdkClientExceptionIsNonRetryable() throws IOException {
         Optional<Exception> exceptionToThrow = 
Optional.of(SdkClientException.builder().build());
         boolean failOnError = false;
 
@@ -307,7 +308,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testGetSizeInBytesNotImplemented() {
+    public void testGetSizeInBytesNotImplemented() throws IOException {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         false, Collections.emptyList(), () -> new 
TrackingDynamoDbAsyncClient());
@@ -315,7 +316,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     @Test
-    public void testClientClosesWhenWriterIsClosed() {
+    public void testClientClosesWhenWriterIsClosed() throws IOException {
         TestAsyncDynamoDbClientProvider testAsyncDynamoDbClientProvider =
                 new TestAsyncDynamoDbClientProvider(new 
TrackingDynamoDbAsyncClient());
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
@@ -327,7 +328,7 @@ public class DynamoDbSinkWriterTest {
     }
 
     private void assertThatRequestsAreNotRetried(
-            boolean failOnError, Optional<Exception> exceptionToThrow) {
+            boolean failOnError, Optional<Exception> exceptionToThrow) throws 
IOException {
         ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
                 new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> 
true);
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
@@ -343,7 +344,8 @@ public class DynamoDbSinkWriterTest {
     private DynamoDbSinkWriter<Map<String, AttributeValue>> 
getDefaultSinkWriter(
             boolean failOnError,
             List<String> overwriteByPartitionKeys,
-            Supplier<DynamoDbAsyncClient> clientSupplier) {
+            Supplier<DynamoDbAsyncClient> clientSupplier)
+            throws IOException {
         return getDefaultSinkWriter(
                 failOnError,
                 overwriteByPartitionKeys,
@@ -353,22 +355,24 @@ public class DynamoDbSinkWriterTest {
     private DynamoDbSinkWriter<Map<String, AttributeValue>> 
getDefaultSinkWriter(
             boolean failOnError,
             List<String> overwriteByPartitionKeys,
-            SdkClientProvider<DynamoDbAsyncClient> 
dynamoDbAsyncClientProvider) {
-        Sink.InitContext initContext = new TestSinkInitContext();
-        return new DynamoDbSinkWriter(
-                new TestDynamoDbElementConverter(),
-                initContext,
-                2,
-                1,
-                10,
-                1024,
-                1000,
-                1024,
-                failOnError,
-                TABLE_NAME,
-                overwriteByPartitionKeys,
-                dynamoDbAsyncClientProvider,
-                Collections.emptyList());
+            SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider)
+            throws IOException {
+        TestSinkInitContext initContext = new TestSinkInitContext();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                new DynamoDbSink<>(
+                        new TestDynamoDbElementConverter(),
+                        2,
+                        1,
+                        10,
+                        1024,
+                        1000,
+                        1024,
+                        failOnError,
+                        TABLE_NAME,
+                        overwriteByPartitionKeys,
+                        new Properties());
+        sink.setDynamoDbAsyncClientProvider(dynamoDbAsyncClientProvider);
+        return (DynamoDbSinkWriter<Map<String, AttributeValue>>) 
sink.createWriter(initContext);
     }
 
     private List<DynamoDbWriteRequest> getDefaultInputRequests() {
diff --git a/flink-connector-aws/flink-connector-kinesis/pom.xml 
b/flink-connector-aws/flink-connector-kinesis/pom.xml
index b345f4a..2ffd448 100644
--- a/flink-connector-aws/flink-connector-kinesis/pom.xml
+++ b/flink-connector-aws/flink-connector-kinesis/pom.xml
@@ -137,6 +137,12 @@ under the License.
         </dependency>
 
         <!-- Flink ecosystem -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d873c4e..f8cd5ab 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -284,7 +284,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
                 new FlinkKinesisConsumer<>("fakeStream", new 
SimpleStringSchema(), config);
         FlinkKinesisConsumer<?> mockedConsumer = spy(consumer);
 
-        RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1);
+        RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 0);
 
         mockedConsumer.setRuntimeContext(context);
         mockedConsumer.initializeState(initializationContext);
diff --git 
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
 
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
index b05eaa7..fa55a4a 100644
--- 
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
+++ 
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java
@@ -25,7 +25,6 @@ import 
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes
 import 
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
 import 
com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade;
 import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
-import lombok.NonNull;
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -208,9 +207,7 @@ class GlueSchemaRegistryAvroSchemaCoderTest {
 
         @Override
         public UUID getSchemaVersionIdByDefinition(
-                @NonNull String schemaDefinition,
-                @NonNull String schemaName,
-                @NonNull String dataFormat) {
+                String schemaDefinition, String schemaName, String dataFormat) 
{
             EntityNotFoundException entityNotFoundException =
                     EntityNotFoundException.builder()
                             
.message(AWSSchemaRegistryConstants.SCHEMA_NOT_FOUND_MSG)
diff --git 
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
 
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
index 682e7bf..37bd90f 100644
--- 
a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
+++ 
b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
@@ -26,7 +26,6 @@ import 
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes
 import 
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
 import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
 import 
com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION;
-import lombok.NonNull;
 import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
@@ -293,7 +292,7 @@ class GlueSchemaRegistryInputStreamDeserializerTest {
         }
 
         @Override
-        public String getSchemaDefinition(@NonNull byte[] data) {
+        public String getSchemaDefinition(byte[] data) {
             return schema.getSchemaDefinition();
         }
 
diff --git a/flink-formats-aws/flink-json-glue-schema-registry/pom.xml 
b/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
index 0af1c75..7f34db6 100644
--- a/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
+++ b/flink-formats-aws/flink-json-glue-schema-registry/pom.xml
@@ -61,6 +61,14 @@ under the License.
             <version>${glue.schema.registry.version}</version>
         </dependency>
 
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.30</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- ArchUit test dependencies -->
 
         <dependency>
diff --git 
a/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
 
b/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
index bb9902b..df45215 100644
--- 
a/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
+++ 
b/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java
@@ -20,11 +20,11 @@ package org.apache.flink.formats.json.glue.schema.registry;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import lombok.NonNull;
-
 import java.io.Serializable;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** Provider for {@link GlueSchemaRegistryJsonSchemaCoder}. */
 @PublicEvolving
 public class GlueSchemaRegistryJsonSchemaCoderProvider implements Serializable 
{
@@ -40,9 +40,9 @@ public class GlueSchemaRegistryJsonSchemaCoderProvider 
implements Serializable {
      * @param configs configurations for AWS Glue Schema Registry
      */
     public GlueSchemaRegistryJsonSchemaCoderProvider(
-            String transportName, @NonNull Map<String, Object> configs) {
+            String transportName, Map<String, Object> configs) {
         this.transportName = transportName;
-        this.configs = configs;
+        this.configs = checkNotNull(configs);
     }
 
     public GlueSchemaRegistryJsonSchemaCoder get() {
diff --git 
a/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
 
b/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
index 83c09eb..ec7f46e 100644
--- 
a/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
+++ 
b/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java
@@ -23,7 +23,6 @@ import 
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes
 import 
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
 import 
com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
 import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
-import lombok.NonNull;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -160,7 +159,7 @@ class GlueSchemaRegistryJsonDeserializationSchemaTest {
         }
 
         @Override
-        public Object deserialize(@NonNull AWSDeserializerInput 
deserializerInput)
+        public Object deserialize(AWSDeserializerInput deserializerInput)
                 throws AWSSchemaRegistryException {
             return userDefinedPojo;
         }
@@ -174,7 +173,7 @@ class GlueSchemaRegistryJsonDeserializationSchemaTest {
         }
 
         @Override
-        public Object deserialize(@NonNull AWSDeserializerInput 
deserializerInput)
+        public Object deserialize(AWSDeserializerInput deserializerInput)
                 throws AWSSchemaRegistryException {
             return userSchema;
         }
diff --git a/pom.xml b/pom.xml
index f9f0ade..54ec609 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@ under the License.
         <netty.version>4.1.86.Final</netty.version>
         <flink.version>1.17.0</flink.version>
         <jackson-bom.version>2.14.3</jackson-bom.version>
-        <glue.schema.registry.version>1.1.14</glue.schema.registry.version>
+        <glue.schema.registry.version>1.1.18</glue.schema.registry.version>
         <guava.version>32.1.3-jre</guava.version>
 
         <junit5.version>5.8.1</junit5.version>
@@ -341,6 +341,16 @@ under the License.
                 <artifactId>amazon-kinesis-client</artifactId>
                 <version>1.14.8</version>
             </dependency>
+            <dependency>
+                <groupId>com.squareup.okio</groupId>
+                <artifactId>okio</artifactId>
+                <version>3.4.0</version>
+            </dependency>
+            <dependency>
+                <groupId>com.squareup.okio</groupId>
+                <artifactId>okio-jvm</artifactId>
+                <version>3.4.0</version>
+            </dependency>
             <dependency>
                 <groupId>org.jetbrains.kotlin</groupId>
                 <artifactId>kotlin-stdlib-common</artifactId>

Reply via email to