This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 61f1f0f  [FLINK-23073][formats / CSV] Fix space handling in Row CSV 
timestamp parser
61f1f0f is described below

commit 61f1f0f4f09875b5d8f4c2db956aa520facc4b2c
Author: Seth Wiesman <sjwies...@gmail.com>
AuthorDate: Mon Jun 21 13:37:41 2021 -0500

    [FLINK-23073][formats / CSV] Fix space handling in Row CSV timestamp parser
    
    This closes #16246
---
 .../org/apache/flink/formats/csv/CsvRowDeserializationSchema.java   | 2 +-
 .../java/org/apache/flink/formats/csv/CsvToRowDataConverters.java   | 2 +-
 .../apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java   | 6 ++++++
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 535767a..783e1f1 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -321,7 +321,7 @@ public final class CsvRowDeserializationSchema implements 
DeserializationSchema<
         } else if (info.equals(Types.LOCAL_TIME)) {
             return (node) -> Time.valueOf(node.asText()).toLocalTime();
         } else if (info.equals(Types.LOCAL_DATE_TIME)) {
-            return (node) -> LocalDateTime.parse(node.asText(), 
SQL_TIMESTAMP_FORMAT);
+            return (node) -> LocalDateTime.parse(node.asText().trim(), 
SQL_TIMESTAMP_FORMAT);
         } else if (info.equals(Types.INSTANT)) {
             return (node) ->
                     LocalDateTime.parse(node.asText(), 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT)
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
index ce19755..64ddc68 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
@@ -254,7 +254,7 @@ public class CsvToRowDataConverters implements Serializable 
{
     private TimestampData convertToTimestamp(
             JsonNode jsonNode, DateTimeFormatter dateTimeFormatter) {
         return TimestampData.fromLocalDateTime(
-                LocalDateTime.parse(jsonNode.asText(), dateTimeFormatter));
+                LocalDateTime.parse(jsonNode.asText().trim(), 
dateTimeFormatter));
     }
 
     private StringData convertToString(JsonNode jsonNode) {
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
index 3e064ea..80a32d0 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
@@ -123,6 +123,12 @@ public class CsvRowDeSerializationSchemaTest {
         testField(Types.INT, "       12          ", 12, deserConfig, ";");
         testField(Types.INT, "12", 12, serConfig, deserConfig, ";");
         testField(
+                Types.LOCAL_DATE_TIME,
+                "    2018-10-12 12:12:12     ",
+                LocalDateTime.parse("2018-10-12T12:12:12"),
+                deserConfig,
+                ";");
+        testField(
                 Types.ROW(Types.STRING, Types.STRING),
                 "1:hello",
                 Row.of("1", "hello"),

Reply via email to