This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 3b97ec9 [FLINK-14784][table] CsvTableSink miss delimiter when row start with null member. 3b97ec9 is described below commit 3b97ec942852327c6845ac42175ba5dc7421fc46 Author: Leonard Xu <xbjt...@gmail.com> AuthorDate: Thu Nov 14 23:19:30 2019 +0800 [FLINK-14784][table] CsvTableSink miss delimiter when row start with null member. This closes #10199 (cherry picked from commit 65ecfab8d5a3f9ef7877ea35ff22fb72ec2db0a5) --- .../org/apache/flink/table/sinks/CsvTableSink.java | 2 +- .../runtime/stream/table/TableSinkITCase.scala | 22 ++++++++++----------- .../runtime/stream/table/TableSinkITCase.scala | 23 +++++++++++----------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java index 56ae63b..7026ebc 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java @@ -178,7 +178,7 @@ public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink< StringBuilder builder = new StringBuilder(); Object o; for (int i = 0; i < row.getArity(); i++) { - if (builder.length() != 0) { + if (i > 0) { builder.append(fieldDelim); } if ((o = row.getField(i)) != null) { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index c889149..c48cf9f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -90,8 +90,8 @@ class TableSinkITCase extends AbstractTestBase { tEnv.registerTableSink( "csvSink", new CsvTableSink(path).configure( - Array[String]("c", "b"), - Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP))) + Array[String]("nullableCol", "c", "b"), + Array[TypeInformation[_]](Types.INT, Types.STRING, Types.SQL_TIMESTAMP))) val input = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._2) @@ -99,20 +99,20 @@ class TableSinkITCase extends AbstractTestBase { input.toTable(tEnv, 'a, 'b.rowtime, 'c) .where('a < 5 || 'a > 17) - .select('c, 'b) + .select(ifThenElse('a < 4, nullOf(Types.INT()), 'a), 'c, 'b) .insertInto("csvSink") env.execute() val expected = Seq( - "Hi,1970-01-01 00:00:00.001", - "Hello,1970-01-01 00:00:00.002", - "Hello world,1970-01-01 00:00:00.002", - "Hello world, how are you?,1970-01-01 00:00:00.003", - "Comment#12,1970-01-01 00:00:00.006", - "Comment#13,1970-01-01 00:00:00.006", - "Comment#14,1970-01-01 00:00:00.006", - "Comment#15,1970-01-01 00:00:00.006").mkString("\n") + ",Hello world,1970-01-01 00:00:00.002", + ",Hello,1970-01-01 00:00:00.002", + ",Hi,1970-01-01 00:00:00.001", + "18,Comment#12,1970-01-01 00:00:00.006", + "19,Comment#13,1970-01-01 00:00:00.006", + "20,Comment#14,1970-01-01 00:00:00.006", + "21,Comment#15,1970-01-01 00:00:00.006", + "4,Hello world, how are you?,1970-01-01 00:00:00.003").mkString("\n") TestBaseUtils.compareResultsByLinesInMemory(expected, path) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index cf3fcbf..e6ff981 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -95,8 +95,8 @@ class TableSinkITCase extends AbstractTestBase { tEnv.registerTableSink( "csvSink", new CsvTableSink(path).configure( - Array[String]("c", "b"), - Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP))) + Array[String]("nullableCol", "c", "b"), + Array[TypeInformation[_]](Types.INT, Types.STRING, Types.SQL_TIMESTAMP))) val input = StreamTestData.get3TupleDataStream(env) .assignAscendingTimestamps(_._2) @@ -104,20 +104,21 @@ class TableSinkITCase extends AbstractTestBase { input.toTable(tEnv, 'a, 'b.rowtime, 'c) .where('a < 5 || 'a > 17) - .select('c, 'b) + .select(ifThenElse('a < 4, nullOf(Types.INT()), 'a), 'c, 'b) .insertInto("csvSink") env.execute() val expected = Seq( - "Hi,1970-01-01 00:00:00.001", - "Hello,1970-01-01 00:00:00.002", - "Hello world,1970-01-01 00:00:00.002", - "Hello world, how are you?,1970-01-01 00:00:00.003", - "Comment#12,1970-01-01 00:00:00.006", - "Comment#13,1970-01-01 00:00:00.006", - "Comment#14,1970-01-01 00:00:00.006", - "Comment#15,1970-01-01 00:00:00.006").mkString("\n") + ",Hello world,1970-01-01 00:00:00.002", + ",Hello,1970-01-01 00:00:00.002", + ",Hi,1970-01-01 00:00:00.001", + "18,Comment#12,1970-01-01 00:00:00.006", + "19,Comment#13,1970-01-01 00:00:00.006", + "20,Comment#14,1970-01-01 00:00:00.006", + "21,Comment#15,1970-01-01 00:00:00.006", + "4,Hello world, how are you?,1970-01-01 00:00:00.003" + ).mkString("\n") TestBaseUtils.compareResultsByLinesInMemory(expected, path) }