This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new 6544697  [FLINK-33164] Support write option sink.ignore-null-valus. 
This closes #21
6544697 is described below

commit 654469780f0ac85f156525575c8b522227559f03
Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com>
AuthorDate: Fri Nov 3 23:51:29 2023 +0800

    [FLINK-33164] Support write option sink.ignore-null-valus. This closes #21
    
    Co-authored-by: tanjialiang <tanjiali...@52tt.com>
    (cherry picked from commit 298d8164495732f59d18c54d4d40b601b6d44f21)
---
 .../hbase1/HBase1DynamicTableFactory.java          |  3 ++
 .../hbase1/sink/HBaseDynamicTableSink.java         |  5 +-
 .../hbase1/HBaseDynamicTableFactoryTest.java       | 12 +++++
 .../hbase2/HBase2DynamicTableFactory.java          |  5 +-
 .../hbase2/sink/HBaseDynamicTableSink.java         |  5 +-
 .../hbase2/HBaseDynamicTableFactoryTest.java       | 12 +++++
 .../connector/hbase/options/HBaseWriteOptions.java | 20 +++++++
 .../hbase/sink/RowDataToMutationConverter.java     |  7 ++-
 .../hbase/table/HBaseConnectorOptions.java         |  6 +++
 .../hbase/table/HBaseConnectorOptionsUtil.java     |  2 +
 .../flink/connector/hbase/util/HBaseSerde.java     | 15 ++++++
 .../flink/connector/hbase/util/HBaseSerdeTest.java | 62 ++++++++++++++++++++--
 12 files changed, 144 insertions(+), 10 deletions(-)

diff --git 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index fbc793c..5321bf2 100644
--- 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -50,6 +50,7 @@ import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
@@ -149,6 +150,7 @@ public class HBase1DynamicTableFactory
         set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
         set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         set.add(SINK_BUFFER_FLUSH_INTERVAL);
+        set.add(SINK_IGNORE_NULL_VALUE);
         set.add(SINK_PARALLELISM);
         set.add(LOOKUP_ASYNC);
         set.add(LOOKUP_CACHE_MAX_ROWS);
@@ -173,6 +175,7 @@ public class HBase1DynamicTableFactory
                         SINK_BUFFER_FLUSH_MAX_SIZE,
                         SINK_BUFFER_FLUSH_MAX_ROWS,
                         SINK_BUFFER_FLUSH_INTERVAL,
+                        SINK_IGNORE_NULL_VALUE,
                         LOOKUP_CACHE_MAX_ROWS,
                         LOOKUP_CACHE_TTL,
                         LOOKUP_MAX_RETRIES)
diff --git 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
index 2b9e87c..0dec937 100644
--- 
a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
+++ 
b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
@@ -61,7 +61,10 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
                 new HBaseSinkFunction<>(
                         tableName,
                         hbaseConf,
-                        new RowDataToMutationConverter(hbaseTableSchema, 
nullStringLiteral),
+                        new RowDataToMutationConverter(
+                                hbaseTableSchema,
+                                nullStringLiteral,
+                                writeOptions.isIgnoreNullValue()),
                         writeOptions.getBufferFlushMaxSizeInBytes(),
                         writeOptions.getBufferFlushMaxRows(),
                         writeOptions.getBufferFlushIntervalMillis());
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
index 8a8f1d7..7c4ed8e 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
@@ -256,6 +256,18 @@ public class HBaseDynamicTableFactoryTest {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testSinkIgnoreNullValueOptions() {
+        Map<String, String> options = getAllOptions();
+        options.put("sink.ignore-null-value", "true");
+
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, 
STRING()));
+
+        DynamicTableSink sink = createTableSink(schema, options);
+        HBaseWriteOptions actual = ((HBaseDynamicTableSink) 
sink).getWriteOptions();
+        assertThat(actual.isIgnoreNullValue()).isTrue();
+    }
+
     @Test
     public void testParallelismOptions() {
         Map<String, String> options = getAllOptions();
diff --git 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 8c55137..8a10a1c 100644
--- 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -50,6 +50,7 @@ import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
@@ -152,6 +153,7 @@ public class HBase2DynamicTableFactory
         set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         set.add(SINK_BUFFER_FLUSH_INTERVAL);
         set.add(SINK_PARALLELISM);
+        set.add(SINK_IGNORE_NULL_VALUE);
         set.add(LOOKUP_ASYNC);
         set.add(LOOKUP_CACHE_MAX_ROWS);
         set.add(LOOKUP_CACHE_TTL);
@@ -177,7 +179,8 @@ public class HBase2DynamicTableFactory
                         LOOKUP_MAX_RETRIES,
                         SINK_BUFFER_FLUSH_MAX_SIZE,
                         SINK_BUFFER_FLUSH_MAX_ROWS,
-                        SINK_BUFFER_FLUSH_INTERVAL)
+                        SINK_BUFFER_FLUSH_INTERVAL,
+                        SINK_IGNORE_NULL_VALUE)
                 .collect(Collectors.toSet());
     }
 }
diff --git 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
index 6ea9ba3..299a457 100644
--- 
a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
+++ 
b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
@@ -62,7 +62,10 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
                 new HBaseSinkFunction<>(
                         tableName,
                         hbaseConf,
-                        new RowDataToMutationConverter(hbaseTableSchema, 
nullStringLiteral),
+                        new RowDataToMutationConverter(
+                                hbaseTableSchema,
+                                nullStringLiteral,
+                                writeOptions.isIgnoreNullValue()),
                         writeOptions.getBufferFlushMaxSizeInBytes(),
                         writeOptions.getBufferFlushMaxRows(),
                         writeOptions.getBufferFlushIntervalMillis());
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
index a535f0e..baf068a 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
@@ -260,6 +260,18 @@ public class HBaseDynamicTableFactoryTest {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testSinkIgnoreNullValueOptions() {
+        Map<String, String> options = getAllOptions();
+        options.put("sink.ignore-null-value", "true");
+
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, 
STRING()));
+
+        DynamicTableSink sink = createTableSink(schema, options);
+        HBaseWriteOptions actual = ((HBaseDynamicTableSink) 
sink).getWriteOptions();
+        assertThat(actual.isIgnoreNullValue()).isTrue();
+    }
+
     @Test
     public void testParallelismOptions() {
         Map<String, String> options = getAllOptions();
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
index 0ba2f88..9462783 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
@@ -34,16 +34,19 @@ public class HBaseWriteOptions implements Serializable {
     private final long bufferFlushMaxSizeInBytes;
     private final long bufferFlushMaxRows;
     private final long bufferFlushIntervalMillis;
+    private final boolean ignoreNullValue;
     private final Integer parallelism;
 
     private HBaseWriteOptions(
             long bufferFlushMaxSizeInBytes,
             long bufferFlushMaxMutations,
             long bufferFlushIntervalMillis,
+            boolean ignoreNullValue,
             Integer parallelism) {
         this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
         this.bufferFlushMaxRows = bufferFlushMaxMutations;
         this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+        this.ignoreNullValue = ignoreNullValue;
         this.parallelism = parallelism;
     }
 
@@ -59,6 +62,10 @@ public class HBaseWriteOptions implements Serializable {
         return bufferFlushIntervalMillis;
     }
 
+    public boolean isIgnoreNullValue() {
+        return ignoreNullValue;
+    }
+
     public Integer getParallelism() {
         return parallelism;
     }
@@ -72,6 +79,8 @@ public class HBaseWriteOptions implements Serializable {
                 + bufferFlushMaxRows
                 + ", bufferFlushIntervalMillis="
                 + bufferFlushIntervalMillis
+                + ", ignoreNullValue="
+                + ignoreNullValue
                 + ", parallelism="
                 + parallelism
                 + '}';
@@ -89,6 +98,7 @@ public class HBaseWriteOptions implements Serializable {
         return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes
                 && bufferFlushMaxRows == that.bufferFlushMaxRows
                 && bufferFlushIntervalMillis == that.bufferFlushIntervalMillis
+                && ignoreNullValue == that.ignoreNullValue
                 && parallelism == that.parallelism;
     }
 
@@ -112,6 +122,7 @@ public class HBaseWriteOptions implements Serializable {
         private long bufferFlushMaxSizeInBytes = 
ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
         private long bufferFlushMaxRows = 0;
         private long bufferFlushIntervalMillis = 0;
+        private boolean ignoreNullValue;
         private Integer parallelism;
 
         /**
@@ -141,6 +152,14 @@ public class HBaseWriteOptions implements Serializable {
             return this;
         }
 
+        /**
+         * Optional. Sets whether ignore null value or not. By defaults, null 
value will be writing.
+         */
+        public Builder setIgnoreNullValue(boolean ignoreNullValue) {
+            this.ignoreNullValue = ignoreNullValue;
+            return this;
+        }
+
         /**
          * Optional. Defines the parallelism of the HBase sink operator. By 
default, the parallelism
          * is determined by the framework using the same parallelism of the 
upstream chained
@@ -157,6 +176,7 @@ public class HBaseWriteOptions implements Serializable {
                     bufferFlushMaxSizeInBytes,
                     bufferFlushMaxRows,
                     bufferFlushIntervalMillis,
+                    ignoreNullValue,
                     parallelism);
         }
     }
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
index 406a996..f07377c 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
@@ -34,16 +34,19 @@ public class RowDataToMutationConverter implements 
HBaseMutationConverter<RowDat
 
     private final HBaseTableSchema schema;
     private final String nullStringLiteral;
+    private final boolean ignoreNullValue;
     private transient HBaseSerde serde;
 
-    public RowDataToMutationConverter(HBaseTableSchema schema, final String 
nullStringLiteral) {
+    public RowDataToMutationConverter(
+            HBaseTableSchema schema, final String nullStringLiteral, boolean 
ignoreNullValue) {
         this.schema = schema;
         this.nullStringLiteral = nullStringLiteral;
+        this.ignoreNullValue = ignoreNullValue;
     }
 
     @Override
     public void open() {
-        this.serde = new HBaseSerde(schema, nullStringLiteral);
+        this.serde = new HBaseSerde(schema, nullStringLiteral, 
ignoreNullValue);
     }
 
     @Override
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
index 0c8dc97..d760c03 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
@@ -88,6 +88,12 @@ public class HBaseConnectorOptions {
                                     + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+    public static final ConfigOption<Boolean> SINK_IGNORE_NULL_VALUE =
+            ConfigOptions.key("sink.ignore-null-value")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Writing option, whether ignore null 
value or not.");
+
     public static final ConfigOption<Boolean> LOOKUP_ASYNC =
             ConfigOptions.key("lookup.async")
                     .booleanType()
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 2141fe1..482644f 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -34,6 +34,7 @@ import java.util.Properties;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
@@ -89,6 +90,7 @@ public class HBaseConnectorOptionsUtil {
         
builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS));
         builder.setBufferFlushMaxSizeInBytes(
                 tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
+        builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE));
         
builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
         return builder.build();
     }
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index d21cc4a..458b25d 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -57,6 +57,8 @@ public class HBaseSerde {
 
     private final byte[] nullStringBytes;
 
+    private final boolean writeIgnoreNullValue;
+
     // row key index in output row
     private final int rowkeyIndex;
 
@@ -77,6 +79,13 @@ public class HBaseSerde {
     private final GenericRowData rowWithRowKey;
 
     public HBaseSerde(HBaseTableSchema hbaseSchema, final String 
nullStringLiteral) {
+        this(hbaseSchema, nullStringLiteral, false);
+    }
+
+    public HBaseSerde(
+            HBaseTableSchema hbaseSchema,
+            final String nullStringLiteral,
+            boolean writeIgnoreNullValue) {
         this.families = hbaseSchema.getFamilyKeys();
         this.rowkeyIndex = hbaseSchema.getRowKeyIndex();
         LogicalType rowkeyType =
@@ -93,6 +102,7 @@ public class HBaseSerde {
             this.keyDecoder = null;
         }
         this.nullStringBytes = 
nullStringLiteral.getBytes(StandardCharsets.UTF_8);
+        this.writeIgnoreNullValue = writeIgnoreNullValue;
 
         // prepare output rows
         this.reusedRow = new GenericRowData(fieldLength);
@@ -141,6 +151,11 @@ public class HBaseSerde {
                 byte[] familyKey = families[f];
                 RowData familyRow = row.getRow(i, qualifiers[f].length);
                 for (int q = 0; q < this.qualifiers[f].length; q++) {
+                    // ignore null value or not
+                    if (writeIgnoreNullValue && familyRow.isNullAt(q)) {
+                        continue;
+                    }
+
                     // get quantifier key
                     byte[] qualifier = qualifiers[f][q];
                     // serialize value
diff --git 
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
 
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index 3e9afb1..e370809 100644
--- 
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++ 
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -19,11 +19,14 @@
 package org.apache.flink.connector.hbase.util;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.jupiter.api.Test;
@@ -58,7 +61,7 @@ class HBaseSerdeTest {
 
     @Test
     void convertToNewRowTest() {
-        HBaseSerde serde = createHBaseSerde();
+        HBaseSerde serde = createHBaseSerde(false);
         List<List<Cell>> cellsList = prepareCells();
         List<RowData> resultRowDatas = new ArrayList<>();
         List<String> resultRowDataStr = new ArrayList<>();
@@ -79,7 +82,7 @@ class HBaseSerdeTest {
 
     @Test
     void convertToReusedRowTest() {
-        HBaseSerde serde = createHBaseSerde();
+        HBaseSerde serde = createHBaseSerde(false);
         List<List<Cell>> cellsList = prepareCells();
         List<RowData> resultRowDatas = new ArrayList<>();
         List<String> resultRowDataStr = new ArrayList<>();
@@ -99,7 +102,32 @@ class HBaseSerdeTest {
                         "+I(2,+I(20),+I(Hello-2,200),+I(2.02,true,Welt-2))");
     }
 
-    private HBaseSerde createHBaseSerde() {
+    @Test
+    public void writeIgnoreNullValueTest() {
+        HBaseSerde serde = createHBaseSerde(false);
+        Put m1 = serde.createPutMutation(prepareRowData());
+        assert m1 != null;
+        assertThat(m1.getRow()).isNotEmpty();
+        assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty();
+        assertThat(m1.get(FAMILY2.getBytes(), F2COL1.getBytes())).isNotEmpty();
+        assertThat(m1.get(FAMILY2.getBytes(), F2COL2.getBytes())).isNotEmpty();
+        assertThat(m1.get(FAMILY3.getBytes(), F3COL1.getBytes())).isNotEmpty();
+        assertThat(m1.get(FAMILY3.getBytes(), F3COL2.getBytes())).isNotEmpty();
+        assertThat(m1.get(FAMILY3.getBytes(), F3COL3.getBytes())).isNotEmpty();
+
+        HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true);
+        Put m2 = writeIgnoreNullValueSerde.createPutMutation(prepareRowData());
+        assert m2 != null;
+        assertThat(m2.getRow()).isNotEmpty();
+        assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();
+        assertThat(m2.get(FAMILY2.getBytes(), F2COL1.getBytes())).isNotEmpty();
+        assertThat(m2.get(FAMILY2.getBytes(), F2COL2.getBytes())).isEmpty();
+        assertThat(m2.get(FAMILY3.getBytes(), F2COL1.getBytes())).isNotEmpty();
+        assertThat(m2.get(FAMILY3.getBytes(), F3COL2.getBytes())).isNotEmpty();
+        assertThat(m2.get(FAMILY3.getBytes(), F3COL3.getBytes())).isEmpty();
+    }
+
+    private HBaseTableSchema createHBaseTableSchema() {
         DataType dataType =
                 ROW(
                         FIELD(ROW_KEY, INT()),
@@ -111,8 +139,11 @@ class HBaseSerdeTest {
                                         FIELD(F3COL1, DOUBLE()),
                                         FIELD(F3COL2, DataTypes.BOOLEAN()),
                                         FIELD(F3COL3, STRING()))));
-        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
-        return new HBaseSerde(hbaseSchema, "null");
+        return HBaseTableSchema.fromDataType(dataType);
+    }
+
+    private HBaseSerde createHBaseSerde(boolean writeIgnoreNullValue) {
+        return new HBaseSerde(createHBaseTableSchema(), "null", 
writeIgnoreNullValue);
     }
 
     private List<List<Cell>> prepareCells() {
@@ -163,4 +194,25 @@ class HBaseSerdeTest {
         cellList.add(cells2);
         return cellList;
     }
+
+    private RowData prepareRowData() {
+        GenericRowData fam1Row = new GenericRowData(1);
+        fam1Row.setField(0, null);
+
+        GenericRowData fam2Row = new GenericRowData(2);
+        fam2Row.setField(0, StringData.fromString("Hello-1"));
+        fam2Row.setField(1, null);
+
+        GenericRowData fam3Row = new GenericRowData(3);
+        fam3Row.setField(0, 2.02);
+        fam3Row.setField(1, true);
+        fam3Row.setField(2, null);
+
+        GenericRowData row = new GenericRowData(4);
+        row.setField(0, 10);
+        row.setField(1, fam1Row);
+        row.setField(2, fam2Row);
+        row.setField(3, fam3Row);
+        return row;
+    }
 }

Reply via email to