This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a967c010a58e0b2277516068256ef45ee711edc
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Jul 28 16:09:43 2022 +0200

    [FLINK-28621][formats] Initialize mappers in open()
---
 .../JSONKeyValueDeserializationSchema.java         |  9 ++++--
 .../csv/CsvRowDataDeserializationSchema.java       |  8 +++--
 .../formats/csv/CsvRowDataSerializationSchema.java | 37 ++++++++++++++++------
 .../formats/csv/CsvRowDeserializationSchema.java   |  8 +++--
 .../formats/csv/CsvRowSerializationSchema.java     | 10 ++++--
 .../json/JsonRowDataDeserializationSchema.java     | 17 +++++++---
 .../json/JsonRowDataSerializationSchema.java       | 12 +++++--
 .../formats/json/JsonRowDeserializationSchema.java | 12 +++++--
 .../formats/json/JsonRowSerializationSchema.java   |  7 +++-
 9 files changed, 90 insertions(+), 30 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index cebd40aac26..86817433a6f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
 
@@ -52,11 +53,13 @@ public class JSONKeyValueDeserializationSchema implements 
KafkaDeserializationSc
         this.includeMetadata = includeMetadata;
     }
 
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
+        mapper = new ObjectMapper();
+    }
+
     @Override
     public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) 
throws Exception {
-        if (mapper == null) {
-            mapper = new ObjectMapper();
-        }
         ObjectNode node = mapper.createObjectNode();
         if (record.key() != null) {
             node.set("key", mapper.readValue(record.key(), JsonNode.class));
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index f5830f31f83..0c3f68e97f8 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -59,7 +59,7 @@ public final class CsvRowDataDeserializationSchema implements 
DeserializationSch
     private final CsvSchema csvSchema;
 
     /** Object reader used to read rows. It is configured by {@link 
CsvSchema}. */
-    private final ObjectReader objectReader;
+    private transient ObjectReader objectReader;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
     private final boolean ignoreParseErrors;
@@ -72,10 +72,14 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
         this.resultTypeInfo = resultTypeInfo;
         this.runtimeConverter = runtimeConverter;
         this.csvSchema = csvSchema;
-        this.objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
         this.ignoreParseErrors = ignoreParseErrors;
     }
 
+    @Override
+    public void open(InitializationContext context) {
+        this.objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+    }
+
     /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
     @Internal
     public static class Builder {
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index 25673fcc3d8..f7fd810e691 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -54,14 +55,16 @@ public final class CsvRowDataSerializationSchema implements 
SerializationSchema<
     /** Runtime instance that performs the actual work. */
     private final RowDataToCsvConverters.RowDataToCsvConverter 
runtimeConverter;
 
+    private final SerializableSupplier<CsvMapper> csvMapperSuppler;
+
     /** CsvMapper used to write {@link JsonNode} into bytes. */
-    private final CsvMapper csvMapper;
+    private transient CsvMapper csvMapper;
 
     /** Schema describing the input CSV data. */
     private final CsvSchema csvSchema;
 
     /** Object writer used to write rows. It is configured by {@link 
CsvSchema}. */
-    private final ObjectWriter objectWriter;
+    private transient ObjectWriter objectWriter;
 
     /** Reusable object node. */
     private transient ObjectNode root;
@@ -72,11 +75,18 @@ public final class CsvRowDataSerializationSchema implements 
SerializationSchema<
             converterContext;
 
     private CsvRowDataSerializationSchema(
-            RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) {
+            RowType rowType,
+            CsvSchema csvSchema,
+            SerializableSupplier<CsvMapper> csvMapperSupplier) {
         this.rowType = rowType;
         this.runtimeConverter = 
RowDataToCsvConverters.createRowConverter(rowType);
-        this.csvMapper = csvMapper;
         this.csvSchema = csvSchema.withLineSeparator("");
+        this.csvMapperSuppler = csvMapperSupplier;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        this.csvMapper = csvMapperSuppler.get();
         this.objectWriter = csvMapper.writer(this.csvSchema);
     }
 
@@ -86,7 +96,7 @@ public final class CsvRowDataSerializationSchema implements 
SerializationSchema<
 
         private final RowType rowType;
         private CsvSchema csvSchema;
-        private CsvMapper csvMapper;
+        private boolean isScientificNotation;
 
         /**
          * Creates a {@link CsvRowDataSerializationSchema} expecting the given 
{@link RowType}.
@@ -98,7 +108,6 @@ public final class CsvRowDataSerializationSchema implements 
SerializationSchema<
 
             this.rowType = rowType;
             this.csvSchema = CsvRowSchemaConverter.convert(rowType);
-            this.csvMapper = new CsvMapper();
         }
 
         public Builder setFieldDelimiter(char c) {
@@ -133,12 +142,22 @@ public final class CsvRowDataSerializationSchema 
implements SerializationSchema<
         }
 
         public void setWriteBigDecimalInScientificNotation(boolean 
isScientificNotation) {
-            this.csvMapper.configure(
-                    JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, 
!isScientificNotation);
+            this.isScientificNotation = isScientificNotation;
         }
 
         public CsvRowDataSerializationSchema build() {
-            return new CsvRowDataSerializationSchema(rowType, csvSchema, 
csvMapper);
+            // assign to local variable to avoid reference to non-serializable 
builder
+            final boolean isScientificNotation = this.isScientificNotation;
+            return new CsvRowDataSerializationSchema(
+                    rowType,
+                    csvSchema,
+                    () -> {
+                        final CsvMapper csvMapper = new CsvMapper();
+                        csvMapper.configure(
+                                
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+                                !isScientificNotation);
+                        return csvMapper;
+                    });
         }
     }
 
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 17ab683acb3..24153b4de46 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -78,7 +78,7 @@ public final class CsvRowDeserializationSchema implements 
DeserializationSchema<
     private final CsvSchema csvSchema;
 
     /** Object reader used to read rows. It is configured by {@link 
CsvSchema}. */
-    private final ObjectReader objectReader;
+    private transient ObjectReader objectReader;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
     private final boolean ignoreParseErrors;
@@ -88,10 +88,14 @@ public final class CsvRowDeserializationSchema implements 
DeserializationSchema<
         this.typeInfo = typeInfo;
         this.runtimeConverter = createRowRuntimeConverter(typeInfo, 
ignoreParseErrors, true);
         this.csvSchema = csvSchema;
-        this.objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
         this.ignoreParseErrors = ignoreParseErrors;
     }
 
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        objectReader = new 
CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+    }
+
     /** A builder for creating a {@link CsvRowDeserializationSchema}. */
     @PublicEvolving
     public static class Builder {
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index 279c435e255..eca56958d96 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -74,13 +74,13 @@ public final class CsvRowSerializationSchema implements 
SerializationSchema<Row>
     private final RuntimeConverter runtimeConverter;
 
     /** CsvMapper used to write {@link JsonNode} into bytes. */
-    private final CsvMapper csvMapper;
+    private transient CsvMapper csvMapper;
 
     /** Schema describing the input CSV data. */
     private final CsvSchema csvSchema;
 
     /** Object writer used to write rows. It is configured by {@link 
CsvSchema}. */
-    private final ObjectWriter objectWriter;
+    private transient ObjectWriter objectWriter;
 
     /** Reusable object node. */
     private transient ObjectNode root;
@@ -88,8 +88,12 @@ public final class CsvRowSerializationSchema implements 
SerializationSchema<Row>
     private CsvRowSerializationSchema(RowTypeInfo typeInfo, CsvSchema 
csvSchema) {
         this.typeInfo = typeInfo;
         this.runtimeConverter = createRowRuntimeConverter(typeInfo, true);
-        this.csvMapper = new CsvMapper();
         this.csvSchema = csvSchema;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        this.csvMapper = new CsvMapper();
         this.objectWriter = csvMapper.writer(csvSchema);
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 83d1b2dc3e8..805b299c11c 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -67,11 +67,13 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
     private final JsonToRowDataConverters.JsonToRowDataConverter 
runtimeConverter;
 
     /** Object mapper for parsing the JSON. */
-    private final ObjectMapper objectMapper = new ObjectMapper();
+    private transient ObjectMapper objectMapper;
 
     /** Timestamp format specification which is used to parse timestamp. */
     private final TimestampFormat timestampFormat;
 
+    private final boolean hasDecimalType;
+
     public JsonRowDataDeserializationSchema(
             RowType rowType,
             TypeInformation<RowData> resultTypeInfo,
@@ -89,12 +91,19 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
                 new JsonToRowDataConverters(failOnMissingField, 
ignoreParseErrors, timestampFormat)
                         .createConverter(checkNotNull(rowType));
         this.timestampFormat = timestampFormat;
-        boolean hasDecimalType =
-                LogicalTypeChecks.hasNested(rowType, t -> t instanceof 
DecimalType);
+        this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t 
instanceof DecimalType);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        objectMapper =
+                new ObjectMapper()
+                        .configure(
+                                
JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
+                                true);
         if (hasDecimalType) {
             
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
-        
objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
 true);
     }
 
     @Override
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 1b77aab1cf4..6a8c619eeac 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -50,7 +50,7 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
     private final RowDataToJsonConverters.RowDataToJsonConverter 
runtimeConverter;
 
     /** Object mapper that is used to create output JSON objects. */
-    private final ObjectMapper mapper = new ObjectMapper();
+    private transient ObjectMapper mapper;
 
     /** Reusable object node. */
     private transient ObjectNode node;
@@ -81,9 +81,15 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
         this.runtimeConverter =
                 new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
                         .createConverter(rowType);
+    }
 
-        mapper.configure(
-                JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, 
encodeDecimalAsPlainNumber);
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        mapper =
+                new ObjectMapper()
+                        .configure(
+                                
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+                                encodeDecimalAsPlainNumber);
     }
 
     @Override
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index a337a78afed..b2b7e6dada5 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -93,8 +93,10 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
 
     private boolean failOnMissingField;
 
+    private final boolean hasDecimalType;
+
     /** Object mapper for parsing the JSON. */
-    private final ObjectMapper objectMapper = new ObjectMapper();
+    private transient ObjectMapper objectMapper;
 
     private DeserializationRuntimeConverter runtimeConverter;
 
@@ -114,8 +116,12 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
         this.runtimeConverter = createConverter(this.typeInfo);
         this.ignoreParseErrors = ignoreParseErrors;
         RowType rowType = (RowType) 
fromLegacyInfoToDataType(this.typeInfo).getLogicalType();
-        boolean hasDecimalType =
-                LogicalTypeChecks.hasNested(rowType, t -> 
t.getTypeRoot().equals(DECIMAL));
+        hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> 
t.getTypeRoot().equals(DECIMAL));
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        objectMapper = new ObjectMapper();
         if (hasDecimalType) {
             
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index 3dd8be1fd37..e789307967e 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -79,7 +79,7 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
     private final RowTypeInfo typeInfo;
 
     /** Object mapper that is used to create output JSON objects. */
-    private final ObjectMapper mapper = new ObjectMapper();
+    private transient ObjectMapper mapper;
 
     private final SerializationRuntimeConverter runtimeConverter;
 
@@ -94,6 +94,11 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
         this.runtimeConverter = createConverter(typeInfo);
     }
 
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        mapper = new ObjectMapper();
+    }
+
     /** Builder for {@link JsonRowSerializationSchema}. */
     @PublicEvolving
     public static class Builder {

Reply via email to