This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit c380e400d4c81c83dabf7ff42f4a5bb193492704 Author: Darcy <331046...@qq.com> AuthorDate: Sat Nov 30 04:02:56 2019 +0800 Add support "upsert part of columns of a kudu table" (#70) Sometimes we don't want to upsert all columns of a kudu table. So we need to support the function that upsert part of columns of a kudu table. --- .../flink/connectors/kudu/connector/KuduRow.java | 4 ++++ .../kudu/connector/writer/KuduWriter.java | 5 ++++- .../connectors/kudu/batch/KuduOuputFormatTest.java | 2 ++ .../connectors/kudu/connector/KuduDatabase.java | 26 ++++++++++++++++++++-- .../connectors/kudu/streaming/KuduSinkTest.java | 3 ++- 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java index af78361..78e6e6e 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java @@ -41,6 +41,10 @@ public class KuduRow extends Row { return super.getField(rowNames.get(name)); } + public boolean hasField(String name) { + return rowNames.get(name) != null; + } + public void setField(int pos, String name, Object value) { super.setField(pos, value); this.rowNames.put(name, pos); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index f4e2a8a..57c0741 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -152,7 +152,10 @@ public class KuduWriter implements AutoCloseable { table.getSchema().getColumns().forEach(column -> { String columnName = column.getName(); - Object value = row.getField(column.getName()); + if (!row.hasField(columnName)) { + return; + } + Object value = row.getField(columnName); if (value == null) { partialRow.setNull(columnName); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java index 963a8c0..f14eaa0 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java @@ -71,6 +71,7 @@ class KuduOuputFormatTest extends KuduDatabase { List<KuduRow> rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); + kuduRowsTest(rows); cleanDatabase(tableInfo); } @@ -99,6 +100,7 @@ class KuduOuputFormatTest extends KuduDatabase { List<KuduRow> rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); + kuduRowsTest(rows); cleanDatabase(tableInfo); } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java index 3d02a1d..cda8c21 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java @@ -56,14 +56,16 @@ public class KuduDatabase { .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).asNullable().build()) + .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).asNullable().build()) .build(); } protected static List<KuduRow> booksDataRow() { return Arrays.stream(booksTableData) .map(row -> { + Integer rowId = (Integer)row[0]; + if (rowId % 2 == 1) { KuduRow values = new KuduRow(5); values.setField(0, "id", row[0]); values.setField(1, "title", row[1]); @@ -71,6 +73,13 @@ public class KuduDatabase { values.setField(3, "price", row[3]); values.setField(4, "quantity", row[4]); return values; + } else { + KuduRow values = new KuduRow(3); + values.setField(0, "id", row[0]); + values.setField(1, "title", row[1]); + values.setField(2, "author", row[2]); + return values; + } }) .collect(Collectors.toList()); } @@ -126,4 +135,17 @@ public class KuduDatabase { return rows; } + protected void kuduRowsTest(List<KuduRow> rows) { + for (KuduRow row: rows) { + Integer rowId = (Integer)row.getField("id"); + if (rowId % 2 == 1) { + Assertions.assertNotEquals(null, row.getField("price")); + Assertions.assertNotEquals(null, row.getField("quantity")); + } + else { + Assertions.assertNull(row.getField("price")); + Assertions.assertNull(row.getField("quantity")); + } + } + } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java index ea49a91..38d0115 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java @@ -88,7 +88,7 @@ class KuduSinkTest extends KuduDatabase { List<KuduRow> rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); - + kuduRowsTest(rows); } @Test @@ -116,6 +116,7 @@ class KuduSinkTest extends KuduDatabase { List<KuduRow> rows = readRows(tableInfo); Assertions.assertEquals(5, rows.size()); + kuduRowsTest(rows); }