This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 61f1f0f [FLINK-23073][formats / CSV] Fix space handling in Row CSV timestamp parser 61f1f0f is described below commit 61f1f0f4f09875b5d8f4c2db956aa520facc4b2c Author: Seth Wiesman <sjwies...@gmail.com> AuthorDate: Mon Jun 21 13:37:41 2021 -0500 [FLINK-23073][formats / CSV] Fix space handling in Row CSV timestamp parser This closes #16246 --- .../org/apache/flink/formats/csv/CsvRowDeserializationSchema.java | 2 +- .../java/org/apache/flink/formats/csv/CsvToRowDataConverters.java | 2 +- .../apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java | 6 ++++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index 535767a..783e1f1 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -321,7 +321,7 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema< } else if (info.equals(Types.LOCAL_TIME)) { return (node) -> Time.valueOf(node.asText()).toLocalTime(); } else if (info.equals(Types.LOCAL_DATE_TIME)) { - return (node) -> LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_FORMAT); + return (node) -> LocalDateTime.parse(node.asText().trim(), SQL_TIMESTAMP_FORMAT); } else if (info.equals(Types.INSTANT)) { return (node) -> LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT) diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java index ce19755..64ddc68 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java @@ -254,7 +254,7 @@ public class CsvToRowDataConverters implements Serializable { private TimestampData convertToTimestamp( JsonNode jsonNode, DateTimeFormatter dateTimeFormatter) { return TimestampData.fromLocalDateTime( - LocalDateTime.parse(jsonNode.asText(), dateTimeFormatter)); + LocalDateTime.parse(jsonNode.asText().trim(), dateTimeFormatter)); } private StringData convertToString(JsonNode jsonNode) { diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java index 3e064ea..80a32d0 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java @@ -123,6 +123,12 @@ public class CsvRowDeSerializationSchemaTest { testField(Types.INT, " 12 ", 12, deserConfig, ";"); testField(Types.INT, "12", 12, serConfig, deserConfig, ";"); testField( + Types.LOCAL_DATE_TIME, + " 2018-10-12 12:12:12 ", + LocalDateTime.parse("2018-10-12T12:12:12"), + deserConfig, + ";"); + testField( Types.ROW(Types.STRING, Types.STRING), "1:hello", Row.of("1", "hello"),