This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a967c010a58e0b2277516068256ef45ee711edc Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Jul 28 16:09:43 2022 +0200 [FLINK-28621][formats] Initialize mappers in open() --- .../JSONKeyValueDeserializationSchema.java | 9 ++++-- .../csv/CsvRowDataDeserializationSchema.java | 8 +++-- .../formats/csv/CsvRowDataSerializationSchema.java | 37 ++++++++++++++++------ .../formats/csv/CsvRowDeserializationSchema.java | 8 +++-- .../formats/csv/CsvRowSerializationSchema.java | 10 ++++-- .../json/JsonRowDataDeserializationSchema.java | 17 +++++++--- .../json/JsonRowDataSerializationSchema.java | 12 +++++-- .../formats/json/JsonRowDeserializationSchema.java | 12 +++++-- .../formats/json/JsonRowSerializationSchema.java | 7 +++- 9 files changed, 90 insertions(+), 30 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index cebd40aac26..86817433a6f 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; @@ -52,11 +53,13 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc this.includeMetadata = includeMetadata; } + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + mapper = new ObjectMapper(); + } + @Override public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { - if (mapper == null) { - mapper = new ObjectMapper(); - } ObjectNode node = mapper.createObjectNode(); if (record.key() != null) { node.set("key", mapper.readValue(record.key(), JsonNode.class)); diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java index f5830f31f83..0c3f68e97f8 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java @@ -59,7 +59,7 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch private final CsvSchema csvSchema; /** Object reader used to read rows. It is configured by {@link CsvSchema}. */ - private final ObjectReader objectReader; + private transient ObjectReader objectReader; /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ private final boolean ignoreParseErrors; @@ -72,10 +72,14 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch this.resultTypeInfo = resultTypeInfo; this.runtimeConverter = runtimeConverter; this.csvSchema = csvSchema; - this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); this.ignoreParseErrors = ignoreParseErrors; } + @Override + public void open(InitializationContext context) { + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + } + /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */ @Internal public static class Builder { diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java index 25673fcc3d8..f7fd810e691 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -54,14 +55,16 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< /** Runtime instance that performs the actual work. */ private final RowDataToCsvConverters.RowDataToCsvConverter runtimeConverter; + private final SerializableSupplier<CsvMapper> csvMapperSuppler; + /** CsvMapper used to write {@link JsonNode} into bytes. */ - private final CsvMapper csvMapper; + private transient CsvMapper csvMapper; /** Schema describing the input CSV data. */ private final CsvSchema csvSchema; /** Object writer used to write rows. It is configured by {@link CsvSchema}. */ - private final ObjectWriter objectWriter; + private transient ObjectWriter objectWriter; /** Reusable object node. */ private transient ObjectNode root; @@ -72,11 +75,18 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< converterContext; private CsvRowDataSerializationSchema( - RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) { + RowType rowType, + CsvSchema csvSchema, + SerializableSupplier<CsvMapper> csvMapperSupplier) { this.rowType = rowType; this.runtimeConverter = RowDataToCsvConverters.createRowConverter(rowType); - this.csvMapper = csvMapper; this.csvSchema = csvSchema.withLineSeparator(""); + this.csvMapperSuppler = csvMapperSupplier; + } + + @Override + public void open(InitializationContext context) throws Exception { + this.csvMapper = csvMapperSuppler.get(); this.objectWriter = csvMapper.writer(this.csvSchema); } @@ -86,7 +96,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< private final RowType rowType; private CsvSchema csvSchema; - private CsvMapper csvMapper; + private boolean isScientificNotation; /** * Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}. @@ -98,7 +108,6 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< this.rowType = rowType; this.csvSchema = CsvRowSchemaConverter.convert(rowType); - this.csvMapper = new CsvMapper(); } public Builder setFieldDelimiter(char c) { @@ -133,12 +142,22 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< } public void setWriteBigDecimalInScientificNotation(boolean isScientificNotation) { - this.csvMapper.configure( - JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation); + this.isScientificNotation = isScientificNotation; } public CsvRowDataSerializationSchema build() { - return new CsvRowDataSerializationSchema(rowType, csvSchema, csvMapper); + // assign to local variable to avoid reference to non-serializable builder + final boolean isScientificNotation = this.isScientificNotation; + return new CsvRowDataSerializationSchema( + rowType, + csvSchema, + () -> { + final CsvMapper csvMapper = new CsvMapper(); + csvMapper.configure( + JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, + !isScientificNotation); + return csvMapper; + }); } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index 17ab683acb3..24153b4de46 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -78,7 +78,7 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema< private final CsvSchema csvSchema; /** Object reader used to read rows. It is configured by {@link CsvSchema}. */ - private final ObjectReader objectReader; + private transient ObjectReader objectReader; /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ private final boolean ignoreParseErrors; @@ -88,10 +88,14 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema< this.typeInfo = typeInfo; this.runtimeConverter = createRowRuntimeConverter(typeInfo, ignoreParseErrors, true); this.csvSchema = csvSchema; - this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); this.ignoreParseErrors = ignoreParseErrors; } + @Override + public void open(InitializationContext context) throws Exception { + objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + } + /** A builder for creating a {@link CsvRowDeserializationSchema}. */ @PublicEvolving public static class Builder { diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java index 279c435e255..eca56958d96 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java @@ -74,13 +74,13 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row> private final RuntimeConverter runtimeConverter; /** CsvMapper used to write {@link JsonNode} into bytes. */ - private final CsvMapper csvMapper; + private transient CsvMapper csvMapper; /** Schema describing the input CSV data. */ private final CsvSchema csvSchema; /** Object writer used to write rows. It is configured by {@link CsvSchema}. */ - private final ObjectWriter objectWriter; + private transient ObjectWriter objectWriter; /** Reusable object node. */ private transient ObjectNode root; @@ -88,8 +88,12 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row> private CsvRowSerializationSchema(RowTypeInfo typeInfo, CsvSchema csvSchema) { this.typeInfo = typeInfo; this.runtimeConverter = createRowRuntimeConverter(typeInfo, true); - this.csvMapper = new CsvMapper(); this.csvSchema = csvSchema; + } + + @Override + public void open(InitializationContext context) throws Exception { + this.csvMapper = new CsvMapper(); this.objectWriter = csvMapper.writer(csvSchema); } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index 83d1b2dc3e8..805b299c11c 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -67,11 +67,13 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; /** Object mapper for parsing the JSON. */ - private final ObjectMapper objectMapper = new ObjectMapper(); + private transient ObjectMapper objectMapper; /** Timestamp format specification which is used to parse timestamp. */ private final TimestampFormat timestampFormat; + private final boolean hasDecimalType; + public JsonRowDataDeserializationSchema( RowType rowType, TypeInformation<RowData> resultTypeInfo, @@ -89,12 +91,19 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat) .createConverter(checkNotNull(rowType)); this.timestampFormat = timestampFormat; - boolean hasDecimalType = - LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); + this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); + } + + @Override + public void open(InitializationContext context) throws Exception { + objectMapper = + new ObjectMapper() + .configure( + JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), + true); if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } - objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index 1b77aab1cf4..6a8c619eeac 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -50,7 +50,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa private final RowDataToJsonConverters.RowDataToJsonConverter runtimeConverter; /** Object mapper that is used to create output JSON objects. */ - private final ObjectMapper mapper = new ObjectMapper(); + private transient ObjectMapper mapper; /** Reusable object node. */ private transient ObjectNode node; @@ -81,9 +81,15 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa this.runtimeConverter = new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral) .createConverter(rowType); + } - mapper.configure( - JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, encodeDecimalAsPlainNumber); + @Override + public void open(InitializationContext context) throws Exception { + mapper = + new ObjectMapper() + .configure( + JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, + encodeDecimalAsPlainNumber); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java index a337a78afed..b2b7e6dada5 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java @@ -93,8 +93,10 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> private boolean failOnMissingField; + private final boolean hasDecimalType; + /** Object mapper for parsing the JSON. */ - private final ObjectMapper objectMapper = new ObjectMapper(); + private transient ObjectMapper objectMapper; private DeserializationRuntimeConverter runtimeConverter; @@ -114,8 +116,12 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> this.runtimeConverter = createConverter(this.typeInfo); this.ignoreParseErrors = ignoreParseErrors; RowType rowType = (RowType) fromLegacyInfoToDataType(this.typeInfo).getLogicalType(); - boolean hasDecimalType = - LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL)); + hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL)); + } + + @Override + public void open(InitializationContext context) throws Exception { + objectMapper = new ObjectMapper(); if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java index 3dd8be1fd37..e789307967e 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java @@ -79,7 +79,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { private final RowTypeInfo typeInfo; /** Object mapper that is used to create output JSON objects. */ - private final ObjectMapper mapper = new ObjectMapper(); + private transient ObjectMapper mapper; private final SerializationRuntimeConverter runtimeConverter; @@ -94,6 +94,11 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { this.runtimeConverter = createConverter(typeInfo); } + @Override + public void open(InitializationContext context) throws Exception { + mapper = new ObjectMapper(); + } + /** Builder for {@link JsonRowSerializationSchema}. */ @PublicEvolving public static class Builder {