This is an automated email from the ASF dual-hosted git repository. hong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push: new 5b6f087 [FLINK-34260][Connectors/AWS] Update flink-connector-aws to be compatible with updated SinkV2 interfaces 5b6f087 is described below commit 5b6f087815bcf18cf62ba39b2ac1f84f5e72f951 Author: Aleksandr Pilipenko <z3d...@gmail.com> AuthorDate: Mon Jan 29 20:17:41 2024 +0000 [FLINK-34260][Connectors/AWS] Update flink-connector-aws to be compatible with updated SinkV2 interfaces --- .../flink-connector-aws-kinesis-firehose/pom.xml | 6 ++ .../sink/KinesisFirehoseSinkWriterTest.java | 14 ++--- .../kinesis/sink/KinesisStreamsSinkWriterTest.java | 15 ++--- .../flink-connector-dynamodb/pom.xml | 6 ++ .../connector/dynamodb/sink/DynamoDbSink.java | 22 +++++++- .../dynamodb/sink/DynamoDbSinkWriterTest.java | 64 ++++++++++++---------- .../flink-connector-kinesis/pom.xml | 6 ++ .../kinesis/FlinkKinesisConsumerTest.java | 2 +- .../GlueSchemaRegistryAvroSchemaCoderTest.java | 5 +- ...eSchemaRegistryInputStreamDeserializerTest.java | 3 +- .../flink-json-glue-schema-registry/pom.xml | 8 +++ .../GlueSchemaRegistryJsonSchemaCoderProvider.java | 8 +-- ...chemaRegistryJsonDeserializationSchemaTest.java | 5 +- pom.xml | 12 +++- 14 files changed, 116 insertions(+), 60 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml b/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml index fd2e7b3..68df44e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml +++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/pom.xml @@ -71,6 +71,12 @@ under the License. <version>${flink.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java index 29160f7..2d9ddef 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java @@ -48,22 +48,22 @@ public class KinesisFirehoseSinkWriterTest { .build(); @BeforeEach - void setup() { + void setup() throws IOException { TestSinkInitContext sinkInitContext = new TestSinkInitContext(); Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint"); - sinkWriter = - new KinesisFirehoseSinkWriter<>( + KinesisFirehoseSink<String> sink = + new KinesisFirehoseSink<>( ELEMENT_CONVERTER_PLACEHOLDER, - sinkInitContext, 50, 16, 10000, - 4 * 1024 * 1024, - 5000, - 1000 * 1024, + 4 * 1024 * 1024L, + 5000L, + 1000 * 1024L, true, "streamName", sinkProperties); + sinkWriter = (KinesisFirehoseSinkWriter<String>) sink.createWriter(sinkInitContext); } @Test diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java index eccfe0a..f3c13d4 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.kinesis.sink; import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; @@ -28,6 +27,7 @@ import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRat import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import java.io.IOException; import java.util.Properties; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -55,13 +55,13 @@ public class KinesisStreamsSinkWriterTest { .build(); @Test - void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() { - Sink.InitContext sinkInitContext = new TestSinkInitContext(); + void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() + throws IOException { + TestSinkInitContext sinkInitContext = new TestSinkInitContext(); Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint"); - sinkWriter = - new KinesisStreamsSinkWriter<String>( + KinesisStreamsSink<String> sink = + new KinesisStreamsSink<>( ELEMENT_CONVERTER_PLACEHOLDER, - sinkInitContext, MAX_BATCH_SIZE, MAX_INFLIGHT_REQUESTS, MAX_BUFFERED_REQUESTS, @@ -70,8 +70,9 @@ public class KinesisStreamsSinkWriterTest { MAX_RECORD_SIZE, FAIL_ON_ERROR, "streamName", - "StreamARN", + "arn:aws:kinesis:us-east-1:000000000000:stream/streamName", sinkProperties); + sinkWriter = (KinesisStreamsSinkWriter<String>) sink.createWriter(sinkInitContext); assertThat(sinkWriter) .extracting("rateLimitingStrategy") diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml b/flink-connector-aws/flink-connector-dynamodb/pom.xml index c4bddcf..b4c6715 100644 --- a/flink-connector-aws/flink-connector-dynamodb/pom.xml +++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml @@ -87,6 +87,12 @@ under the License. </dependency> <!-- Test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java index c90aa19..8f64a67 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java @@ -20,12 +20,16 @@ package org.apache.flink.connector.dynamodb.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider; +import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; import org.apache.flink.core.io.SimpleVersionedSerializer; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; + import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -80,6 +84,7 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq private final boolean failOnError; private final String tableName; private final List<String> overwriteByPartitionKeys; + private transient SdkClientProvider<DynamoDbAsyncClient> asyncClientSdkClientProviderOverride; protected DynamoDbSink( ElementConverter<InputT, DynamoDbWriteRequest> elementConverter, @@ -152,7 +157,7 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq failOnError, tableName, overwriteByPartitionKeys, - new DynamoDbAsyncClientProvider(dynamoDbClientProperties), + getAsyncClientProvider(dynamoDbClientProperties), recoveredState); } @@ -162,4 +167,19 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq getWriterStateSerializer() { return new DynamoDbWriterStateSerializer(); } + + private SdkClientProvider<DynamoDbAsyncClient> getAsyncClientProvider( + Properties clientProperties) { + if (asyncClientSdkClientProviderOverride != null) { + return asyncClientSdkClientProviderOverride; + } + return new DynamoDbAsyncClientProvider(clientProperties); + } + + @Internal + @VisibleForTesting + void setDynamoDbAsyncClientProvider( + SdkClientProvider<DynamoDbAsyncClient> asyncClientSdkClientProviderOverride) { + this.asyncClientSdkClientProviderOverride = asyncClientSdkClientProviderOverride; + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java index d37e184..f0d139f 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.dynamodb.sink; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; @@ -39,6 +38,7 @@ import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import software.amazon.awssdk.services.sts.model.StsException; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -174,7 +175,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() { + public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() throws IOException { Optional<Exception> exceptionToThrow = getGenericRetryableException(); boolean failOnError = true; @@ -234,7 +235,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() { + public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() throws IOException { Optional<Exception> exceptionToThrow = getGenericNonRetryableException(); boolean failOnError = true; @@ -242,7 +243,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() { + public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() throws IOException { Optional<Exception> exceptionToThrow = getGenericNonRetryableException(); boolean failOnError = false; @@ -250,7 +251,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testInterruptedExceptionIsNonRetryable() { + public void testInterruptedExceptionIsNonRetryable() throws IOException { Optional<Exception> exceptionToThrow = Optional.of(new InterruptedException()); boolean failOnError = false; @@ -258,7 +259,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testInvalidCredentialsExceptionIsNonRetryable() { + public void testInvalidCredentialsExceptionIsNonRetryable() throws IOException { Optional<Exception> exceptionToThrow = Optional.of(StsException.builder().build()); boolean failOnError = false; @@ -266,7 +267,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testResourceNotFoundExceptionIsNonRetryable() { + public void testResourceNotFoundExceptionIsNonRetryable() throws IOException { Optional<Exception> exceptionToThrow = Optional.of(ResourceNotFoundException.builder().build()); boolean failOnError = false; @@ -275,7 +276,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testConditionalCheckFailedExceptionIsNonRetryable() { + public void testConditionalCheckFailedExceptionIsNonRetryable() throws IOException { Optional<Exception> exceptionToThrow = Optional.of(ConditionalCheckFailedException.builder().build()); boolean failOnError = false; @@ -284,7 +285,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testValidationExceptionIsNonRetryable() { + public void testValidationExceptionIsNonRetryable() throws IOException { Optional<Exception> exceptionToThrow = Optional.of( DynamoDbException.builder() @@ -299,7 +300,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testSdkClientExceptionIsNonRetryable() { + public void testSdkClientExceptionIsNonRetryable() throws IOException { Optional<Exception> exceptionToThrow = Optional.of(SdkClientException.builder().build()); boolean failOnError = false; @@ -307,7 +308,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testGetSizeInBytesNotImplemented() { + public void testGetSizeInBytesNotImplemented() throws IOException { DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter = getDefaultSinkWriter( false, Collections.emptyList(), () -> new TrackingDynamoDbAsyncClient()); @@ -315,7 +316,7 @@ public class DynamoDbSinkWriterTest { } @Test - public void testClientClosesWhenWriterIsClosed() { + public void testClientClosesWhenWriterIsClosed() throws IOException { TestAsyncDynamoDbClientProvider testAsyncDynamoDbClientProvider = new TestAsyncDynamoDbClientProvider(new TrackingDynamoDbAsyncClient()); DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter = @@ -327,7 +328,7 @@ public class DynamoDbSinkWriterTest { } private void assertThatRequestsAreNotRetried( - boolean failOnError, Optional<Exception> exceptionToThrow) { + boolean failOnError, Optional<Exception> exceptionToThrow) throws IOException { ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient = new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> true); DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter = @@ -343,7 +344,8 @@ public class DynamoDbSinkWriterTest { private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter( boolean failOnError, List<String> overwriteByPartitionKeys, - Supplier<DynamoDbAsyncClient> clientSupplier) { + Supplier<DynamoDbAsyncClient> clientSupplier) + throws IOException { return getDefaultSinkWriter( failOnError, overwriteByPartitionKeys, @@ -353,22 +355,24 @@ public class DynamoDbSinkWriterTest { private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter( boolean failOnError, List<String> overwriteByPartitionKeys, - SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider) { - Sink.InitContext initContext = new TestSinkInitContext(); - return new DynamoDbSinkWriter( - new TestDynamoDbElementConverter(), - initContext, - 2, - 1, - 10, - 1024, - 1000, - 1024, - failOnError, - TABLE_NAME, - overwriteByPartitionKeys, - dynamoDbAsyncClientProvider, - Collections.emptyList()); + SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider) + throws IOException { + TestSinkInitContext initContext = new TestSinkInitContext(); + DynamoDbSink<Map<String, AttributeValue>> sink = + new DynamoDbSink<>( + new TestDynamoDbElementConverter(), + 2, + 1, + 10, + 1024, + 1000, + 1024, + failOnError, + TABLE_NAME, + overwriteByPartitionKeys, + new Properties()); + sink.setDynamoDbAsyncClientProvider(dynamoDbAsyncClientProvider); + return (DynamoDbSinkWriter<Map<String, AttributeValue>>) sink.createWriter(initContext); } private List<DynamoDbWriteRequest> getDefaultInputRequests() { diff --git a/flink-connector-aws/flink-connector-kinesis/pom.xml b/flink-connector-aws/flink-connector-kinesis/pom.xml index b345f4a..2ffd448 100644 --- a/flink-connector-aws/flink-connector-kinesis/pom.xml +++ b/flink-connector-aws/flink-connector-kinesis/pom.xml @@ -137,6 +137,12 @@ under the License. </dependency> <!-- Flink ecosystem --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index d873c4e..f8cd5ab 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -284,7 +284,7 @@ public class FlinkKinesisConsumerTest extends TestLogger { new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); FlinkKinesisConsumer<?> mockedConsumer = spy(consumer); - RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1); + RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 0); mockedConsumer.setRuntimeContext(context); mockedConsumer.initializeState(initializationContext); diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java index b05eaa7..fa55a4a 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderTest.java @@ -25,7 +25,6 @@ import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import lombok.NonNull; import org.apache.avro.Schema; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -208,9 +207,7 @@ class GlueSchemaRegistryAvroSchemaCoderTest { @Override public UUID getSchemaVersionIdByDefinition( - @NonNull String schemaDefinition, - @NonNull String schemaName, - @NonNull String dataFormat) { + String schemaDefinition, String schemaName, String dataFormat) { EntityNotFoundException entityNotFoundException = EntityNotFoundException.builder() .message(AWSSchemaRegistryConstants.SCHEMA_NOT_FOUND_MSG) diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java index 682e7bf..37bd90f 100644 --- a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java +++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java @@ -26,7 +26,6 @@ import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION; -import lombok.NonNull; import org.apache.avro.Schema; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; @@ -293,7 +292,7 @@ class GlueSchemaRegistryInputStreamDeserializerTest { } @Override - public String getSchemaDefinition(@NonNull byte[] data) { + public String getSchemaDefinition(byte[] data) { return schema.getSchemaDefinition(); } diff --git a/flink-formats-aws/flink-json-glue-schema-registry/pom.xml b/flink-formats-aws/flink-json-glue-schema-registry/pom.xml index 0af1c75..7f34db6 100644 --- a/flink-formats-aws/flink-json-glue-schema-registry/pom.xml +++ b/flink-formats-aws/flink-json-glue-schema-registry/pom.xml @@ -61,6 +61,14 @@ under the License. <version>${glue.schema.registry.version}</version> </dependency> + <!-- Test dependencies --> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.30</version> + <scope>test</scope> + </dependency> + <!-- ArchUit test dependencies --> <dependency> diff --git a/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java b/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java index bb9902b..df45215 100644 --- a/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java +++ b/flink-formats-aws/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonSchemaCoderProvider.java @@ -20,11 +20,11 @@ package org.apache.flink.formats.json.glue.schema.registry; import org.apache.flink.annotation.PublicEvolving; -import lombok.NonNull; - import java.io.Serializable; import java.util.Map; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** Provider for {@link GlueSchemaRegistryJsonSchemaCoder}. */ @PublicEvolving public class GlueSchemaRegistryJsonSchemaCoderProvider implements Serializable { @@ -40,9 +40,9 @@ public class GlueSchemaRegistryJsonSchemaCoderProvider implements Serializable { * @param configs configurations for AWS Glue Schema Registry */ public GlueSchemaRegistryJsonSchemaCoderProvider( - String transportName, @NonNull Map<String, Object> configs) { + String transportName, Map<String, Object> configs) { this.transportName = transportName; - this.configs = configs; + this.configs = checkNotNull(configs); } public GlueSchemaRegistryJsonSchemaCoder get() { diff --git a/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java b/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java index 83c09eb..ec7f46e 100644 --- a/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java +++ b/flink-formats-aws/flink-json-glue-schema-registry/src/test/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchemaTest.java @@ -23,7 +23,6 @@ import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDes import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import lombok.NonNull; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -160,7 +159,7 @@ class GlueSchemaRegistryJsonDeserializationSchemaTest { } @Override - public Object deserialize(@NonNull AWSDeserializerInput deserializerInput) + public Object deserialize(AWSDeserializerInput deserializerInput) throws AWSSchemaRegistryException { return userDefinedPojo; } @@ -174,7 +173,7 @@ class GlueSchemaRegistryJsonDeserializationSchemaTest { } @Override - public Object deserialize(@NonNull AWSDeserializerInput deserializerInput) + public Object deserialize(AWSDeserializerInput deserializerInput) throws AWSSchemaRegistryException { return userSchema; } diff --git a/pom.xml b/pom.xml index f9f0ade..54ec609 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ under the License. <netty.version>4.1.86.Final</netty.version> <flink.version>1.17.0</flink.version> <jackson-bom.version>2.14.3</jackson-bom.version> - <glue.schema.registry.version>1.1.14</glue.schema.registry.version> + <glue.schema.registry.version>1.1.18</glue.schema.registry.version> <guava.version>32.1.3-jre</guava.version> <junit5.version>5.8.1</junit5.version> @@ -341,6 +341,16 @@ under the License. <artifactId>amazon-kinesis-client</artifactId> <version>1.14.8</version> </dependency> + <dependency> + <groupId>com.squareup.okio</groupId> + <artifactId>okio</artifactId> + <version>3.4.0</version> + </dependency> + <dependency> + <groupId>com.squareup.okio</groupId> + <artifactId>okio-jvm</artifactId> + <version>3.4.0</version> + </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-common</artifactId>