This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 3be64d73d28fc376b8df0dfa71c0f5c566524876 Author: Martijn Visser <mvis...@confluent.io> AuthorDate: Mon Oct 9 15:23:15 2023 +0200 [FLINK-33191][Connector/Kafka] Remove dependency on Flink Shaded --- .../connector/kafka/util/JacksonMapperFactory.java | 49 ++++++++++++++++++++++ .../JSONKeyValueDeserializationSchema.java | 9 ++-- .../KafkaRecordDeserializationSchemaTest.java | 9 ++-- .../JSONKeyValueDeserializationSchemaTest.java | 7 ++-- pom.xml | 24 ++++++++--- 5 files changed, 78 insertions(+), 20 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java new file mode 100644 index 00000000..c9301c79 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.util; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +/** Factory for Jackson mappers. */ +public final class JacksonMapperFactory { + + public static ObjectMapper createObjectMapper() { + final ObjectMapper objectMapper = new ObjectMapper(); + registerModules(objectMapper); + return objectMapper; + } + + public static ObjectMapper createObjectMapper(JsonFactory jsonFactory) { + final ObjectMapper objectMapper = new ObjectMapper(jsonFactory); + registerModules(objectMapper); + return objectMapper; + } + + private static void registerModules(ObjectMapper mapper) { + mapper.registerModule(new JavaTimeModule()) + .registerModule(new Jdk8Module().configureAbsentsAsNulls(true)) + .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + private JacksonMapperFactory() {} +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index e2b428ee..970bad1c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -20,13 +20,12 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.util.JacksonMapperFactory; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; -import org.apache.flink.util.jackson.JacksonMapperFactory; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.clients.consumer.ConsumerRecord; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java index e764c860..d61b7f83 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java @@ -18,17 +18,16 @@ package org.apache.flink.connector.kafka.source.reader.deserializer; +import org.apache.flink.connector.kafka.util.JacksonMapperFactory; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext; import org.apache.flink.formats.json.JsonDeserializationSchema; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.flink.util.Collector; -import org.apache.flink.util.jackson.JacksonMapperFactory; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.serialization.StringDeserializer; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java index ddbcf1c9..a5abb5e6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java @@ -17,13 +17,12 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.connector.kafka.util.JacksonMapperFactory; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; -import org.apache.flink.util.jackson.JacksonMapperFactory; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; diff --git a/pom.xml b/pom.xml index c7db3228..00a2ed82 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ under the License. <kafka.version>3.2.3</kafka.version> <zookeeper.version>3.5.9</zookeeper.version> - <jackson-bom.version>2.13.4.20221013</jackson-bom.version> + <jackson-bom.version>2.15.2</jackson-bom.version> <junit4.version>4.13.2</junit4.version> <junit5.version>5.9.1</junit5.version> <assertj.version>3.23.1</assertj.version> @@ -77,13 +77,25 @@ under the License. </properties> <dependencies> + <!-- Root dependencies for all projects --> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-jackson</artifactId> - <version>2.13.4-16.1</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <!-- Java 8 Date/time --> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jsr310</artifactId> + </dependency> + <dependency> + <!-- Java 8 Datatypes --> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jdk8</artifactId> </dependency> - - <!-- Root dependencies for all projects --> <!-- Logging API --> <dependency>