This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 3b97ec9  [FLINK-14784][table] CsvTableSink miss delimiter when row 
start with null member.
3b97ec9 is described below

commit 3b97ec942852327c6845ac42175ba5dc7421fc46
Author: Leonard Xu <xbjt...@gmail.com>
AuthorDate: Thu Nov 14 23:19:30 2019 +0800

    [FLINK-14784][table] CsvTableSink miss delimiter when row start with null 
member.
    
    This closes #10199
    
    (cherry picked from commit 65ecfab8d5a3f9ef7877ea35ff22fb72ec2db0a5)
---
 .../org/apache/flink/table/sinks/CsvTableSink.java |  2 +-
 .../runtime/stream/table/TableSinkITCase.scala     | 22 ++++++++++-----------
 .../runtime/stream/table/TableSinkITCase.scala     | 23 +++++++++++-----------
 3 files changed, 24 insertions(+), 23 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
index 56ae63b..7026ebc 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -178,7 +178,7 @@ public class CsvTableSink implements BatchTableSink<Row>, 
AppendStreamTableSink<
                        StringBuilder builder = new StringBuilder();
                        Object o;
                        for (int i = 0; i < row.getArity(); i++) {
-                               if (builder.length() != 0) {
+                               if (i > 0) {
                                        builder.append(fieldDelim);
                                }
                                if ((o = row.getField(i)) != null) {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c889149..c48cf9f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -90,8 +90,8 @@ class TableSinkITCase extends AbstractTestBase {
     tEnv.registerTableSink(
       "csvSink",
       new CsvTableSink(path).configure(
-        Array[String]("c", "b"),
-        Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP)))
+        Array[String]("nullableCol", "c", "b"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING, 
Types.SQL_TIMESTAMP)))
 
     val input = env.fromCollection(tupleData3)
       .assignAscendingTimestamps(_._2)
@@ -99,20 +99,20 @@ class TableSinkITCase extends AbstractTestBase {
 
     input.toTable(tEnv, 'a, 'b.rowtime, 'c)
       .where('a < 5 || 'a > 17)
-      .select('c, 'b)
+      .select(ifThenElse('a < 4, nullOf(Types.INT()), 'a), 'c, 'b)
       .insertInto("csvSink")
 
     env.execute()
 
     val expected = Seq(
-      "Hi,1970-01-01 00:00:00.001",
-      "Hello,1970-01-01 00:00:00.002",
-      "Hello world,1970-01-01 00:00:00.002",
-      "Hello world, how are you?,1970-01-01 00:00:00.003",
-      "Comment#12,1970-01-01 00:00:00.006",
-      "Comment#13,1970-01-01 00:00:00.006",
-      "Comment#14,1970-01-01 00:00:00.006",
-      "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
+      ",Hello world,1970-01-01 00:00:00.002",
+      ",Hello,1970-01-01 00:00:00.002",
+      ",Hi,1970-01-01 00:00:00.001",
+      "18,Comment#12,1970-01-01 00:00:00.006",
+      "19,Comment#13,1970-01-01 00:00:00.006",
+      "20,Comment#14,1970-01-01 00:00:00.006",
+      "21,Comment#15,1970-01-01 00:00:00.006",
+      "4,Hello world, how are you?,1970-01-01 00:00:00.003").mkString("\n")
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index cf3fcbf..e6ff981 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -95,8 +95,8 @@ class TableSinkITCase extends AbstractTestBase {
     tEnv.registerTableSink(
       "csvSink",
       new CsvTableSink(path).configure(
-        Array[String]("c", "b"),
-        Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP)))
+        Array[String]("nullableCol", "c", "b"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING, 
Types.SQL_TIMESTAMP)))
 
     val input = StreamTestData.get3TupleDataStream(env)
       .assignAscendingTimestamps(_._2)
@@ -104,20 +104,21 @@ class TableSinkITCase extends AbstractTestBase {
 
     input.toTable(tEnv, 'a, 'b.rowtime, 'c)
       .where('a < 5 || 'a > 17)
-      .select('c, 'b)
+      .select(ifThenElse('a < 4, nullOf(Types.INT()), 'a), 'c, 'b)
       .insertInto("csvSink")
 
     env.execute()
 
     val expected = Seq(
-      "Hi,1970-01-01 00:00:00.001",
-      "Hello,1970-01-01 00:00:00.002",
-      "Hello world,1970-01-01 00:00:00.002",
-      "Hello world, how are you?,1970-01-01 00:00:00.003",
-      "Comment#12,1970-01-01 00:00:00.006",
-      "Comment#13,1970-01-01 00:00:00.006",
-      "Comment#14,1970-01-01 00:00:00.006",
-      "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
+      ",Hello world,1970-01-01 00:00:00.002",
+      ",Hello,1970-01-01 00:00:00.002",
+      ",Hi,1970-01-01 00:00:00.001",
+      "18,Comment#12,1970-01-01 00:00:00.006",
+      "19,Comment#13,1970-01-01 00:00:00.006",
+      "20,Comment#14,1970-01-01 00:00:00.006",
+      "21,Comment#15,1970-01-01 00:00:00.006",
+      "4,Hello world, how are you?,1970-01-01 00:00:00.003"
+    ).mkString("\n")
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }

Reply via email to