This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 3be64d73d28fc376b8df0dfa71c0f5c566524876
Author: Martijn Visser <mvis...@confluent.io>
AuthorDate: Mon Oct 9 15:23:15 2023 +0200

    [FLINK-33191][Connector/Kafka] Remove dependency on Flink Shaded
---
 .../connector/kafka/util/JacksonMapperFactory.java | 49 ++++++++++++++++++++++
 .../JSONKeyValueDeserializationSchema.java         |  9 ++--
 .../KafkaRecordDeserializationSchemaTest.java      |  9 ++--
 .../JSONKeyValueDeserializationSchemaTest.java     |  7 ++--
 pom.xml                                            | 24 ++++++++---
 5 files changed, 78 insertions(+), 20 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java
new file mode 100644
index 00000000..c9301c79
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/util/JacksonMapperFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.util;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+/** Factory for Jackson mappers. */
+public final class JacksonMapperFactory {
+
+    public static ObjectMapper createObjectMapper() {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        registerModules(objectMapper);
+        return objectMapper;
+    }
+
+    public static ObjectMapper createObjectMapper(JsonFactory jsonFactory) {
+        final ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
+        registerModules(objectMapper);
+        return objectMapper;
+    }
+
+    private static void registerModules(ObjectMapper mapper) {
+        mapper.registerModule(new JavaTimeModule())
+                .registerModule(new Jdk8Module().configureAbsentsAsNulls(true))
+                .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS)
+                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+    }
+
+    private JacksonMapperFactory() {}
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index e2b428ee..970bad1c 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -20,13 +20,12 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index e764c860..d61b7f83 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -18,17 +18,16 @@
 
 package org.apache.flink.connector.kafka.source.reader.deserializer;
 
+import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import 
org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
 import org.apache.flink.formats.json.JsonDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.serialization.StringDeserializer;
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index ddbcf1c9..a5abb5e6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -17,13 +17,12 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
-import org.apache.flink.util.jackson.JacksonMapperFactory;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
diff --git a/pom.xml b/pom.xml
index c7db3228..00a2ed82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@ under the License.
         <kafka.version>3.2.3</kafka.version>
         <zookeeper.version>3.5.9</zookeeper.version>
 
-        <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
+        <jackson-bom.version>2.15.2</jackson-bom.version>
         <junit4.version>4.13.2</junit4.version>
         <junit5.version>5.9.1</junit5.version>
         <assertj.version>3.23.1</assertj.version>
@@ -77,13 +77,25 @@ under the License.
     </properties>
 
     <dependencies>
+        <!-- Root dependencies for all projects -->
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-shaded-jackson</artifactId>
-            <version>2.13.4-16.1</version>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <!--       Java 8 Date/time        -->
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+        </dependency>
+        <dependency>
+            <!--       Java 8 Datatypes        -->
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jdk8</artifactId>
         </dependency>
-
-        <!-- Root dependencies for all projects -->
 
         <!-- Logging API -->
         <dependency>

Reply via email to