This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push: new 298d816 [FLINK-33164] Support write option sink.ignore-null-valus. This closes #21 298d816 is described below commit 298d8164495732f59d18c54d4d40b601b6d44f21 Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com> AuthorDate: Fri Nov 3 23:51:29 2023 +0800 [FLINK-33164] Support write option sink.ignore-null-valus. This closes #21 Co-authored-by: tanjialiang <tanjiali...@52tt.com> --- .../hbase1/HBase1DynamicTableFactory.java | 3 ++ .../hbase1/sink/HBaseDynamicTableSink.java | 5 +- .../hbase1/HBaseDynamicTableFactoryTest.java | 12 +++++ .../hbase2/HBase2DynamicTableFactory.java | 5 +- .../hbase2/sink/HBaseDynamicTableSink.java | 5 +- .../hbase2/HBaseDynamicTableFactoryTest.java | 12 +++++ .../connector/hbase/options/HBaseWriteOptions.java | 20 +++++++ .../hbase/sink/RowDataToMutationConverter.java | 7 ++- .../hbase/table/HBaseConnectorOptions.java | 6 +++ .../hbase/table/HBaseConnectorOptionsUtil.java | 2 + .../flink/connector/hbase/util/HBaseSerde.java | 15 ++++++ .../flink/connector/hbase/util/HBaseSerdeTest.java | 62 ++++++++++++++++++++-- 12 files changed, 144 insertions(+), 10 deletions(-) diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java index fbc793c..5321bf2 100644 --- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java +++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java @@ -50,6 +50,7 @@ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; @@ -149,6 +150,7 @@ public class HBase1DynamicTableFactory set.add(SINK_BUFFER_FLUSH_MAX_SIZE); set.add(SINK_BUFFER_FLUSH_MAX_ROWS); set.add(SINK_BUFFER_FLUSH_INTERVAL); + set.add(SINK_IGNORE_NULL_VALUE); set.add(SINK_PARALLELISM); set.add(LOOKUP_ASYNC); set.add(LOOKUP_CACHE_MAX_ROWS); @@ -173,6 +175,7 @@ public class HBase1DynamicTableFactory SINK_BUFFER_FLUSH_MAX_SIZE, SINK_BUFFER_FLUSH_MAX_ROWS, SINK_BUFFER_FLUSH_INTERVAL, + SINK_IGNORE_NULL_VALUE, LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL, LOOKUP_MAX_RETRIES) diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java index 2b9e87c..0dec937 100644 --- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java +++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java @@ -61,7 +61,10 @@ public class HBaseDynamicTableSink implements DynamicTableSink { new HBaseSinkFunction<>( tableName, hbaseConf, - new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral), + new RowDataToMutationConverter( + hbaseTableSchema, + nullStringLiteral, + writeOptions.isIgnoreNullValue()), writeOptions.getBufferFlushMaxSizeInBytes(), writeOptions.getBufferFlushMaxRows(), writeOptions.getBufferFlushIntervalMillis()); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java index 8a8f1d7..7c4ed8e 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java @@ -256,6 +256,18 @@ public class HBaseDynamicTableFactoryTest { assertEquals(expected, actual); } + @Test + public void testSinkIgnoreNullValueOptions() { + Map<String, String> options = getAllOptions(); + options.put("sink.ignore-null-value", "true"); + + ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); + + DynamicTableSink sink = createTableSink(schema, options); + HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions(); + assertThat(actual.isIgnoreNullValue()).isTrue(); + } + @Test public void testParallelismOptions() { Map<String, String> options = getAllOptions(); diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java index 8c55137..8a10a1c 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java @@ -50,6 +50,7 @@ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; @@ -152,6 +153,7 @@ public class HBase2DynamicTableFactory set.add(SINK_BUFFER_FLUSH_MAX_ROWS); set.add(SINK_BUFFER_FLUSH_INTERVAL); set.add(SINK_PARALLELISM); + set.add(SINK_IGNORE_NULL_VALUE); set.add(LOOKUP_ASYNC); set.add(LOOKUP_CACHE_MAX_ROWS); set.add(LOOKUP_CACHE_TTL); @@ -177,7 +179,8 @@ public class HBase2DynamicTableFactory LOOKUP_MAX_RETRIES, SINK_BUFFER_FLUSH_MAX_SIZE, SINK_BUFFER_FLUSH_MAX_ROWS, - SINK_BUFFER_FLUSH_INTERVAL) + SINK_BUFFER_FLUSH_INTERVAL, + SINK_IGNORE_NULL_VALUE) .collect(Collectors.toSet()); } } diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java index 6ea9ba3..299a457 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java @@ -62,7 +62,10 @@ public class HBaseDynamicTableSink implements DynamicTableSink { new HBaseSinkFunction<>( tableName, hbaseConf, - new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral), + new RowDataToMutationConverter( + hbaseTableSchema, + nullStringLiteral, + writeOptions.isIgnoreNullValue()), writeOptions.getBufferFlushMaxSizeInBytes(), writeOptions.getBufferFlushMaxRows(), writeOptions.getBufferFlushIntervalMillis()); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java index a535f0e..baf068a 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java @@ -260,6 +260,18 @@ public class HBaseDynamicTableFactoryTest { assertEquals(expected, actual); } + @Test + public void testSinkIgnoreNullValueOptions() { + Map<String, String> options = getAllOptions(); + options.put("sink.ignore-null-value", "true"); + + ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); + + DynamicTableSink sink = createTableSink(schema, options); + HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions(); + assertThat(actual.isIgnoreNullValue()).isTrue(); + } + @Test public void testParallelismOptions() { Map<String, String> options = getAllOptions(); diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java index 0ba2f88..9462783 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java @@ -34,16 +34,19 @@ public class HBaseWriteOptions implements Serializable { private final long bufferFlushMaxSizeInBytes; private final long bufferFlushMaxRows; private final long bufferFlushIntervalMillis; + private final boolean ignoreNullValue; private final Integer parallelism; private HBaseWriteOptions( long bufferFlushMaxSizeInBytes, long bufferFlushMaxMutations, long bufferFlushIntervalMillis, + boolean ignoreNullValue, Integer parallelism) { this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; this.bufferFlushMaxRows = bufferFlushMaxMutations; this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; + this.ignoreNullValue = ignoreNullValue; this.parallelism = parallelism; } @@ -59,6 +62,10 @@ public class HBaseWriteOptions implements Serializable { return bufferFlushIntervalMillis; } + public boolean isIgnoreNullValue() { + return ignoreNullValue; + } + public Integer getParallelism() { return parallelism; } @@ -72,6 +79,8 @@ public class HBaseWriteOptions implements Serializable { + bufferFlushMaxRows + ", bufferFlushIntervalMillis=" + bufferFlushIntervalMillis + + ", ignoreNullValue=" + + ignoreNullValue + ", parallelism=" + parallelism + '}'; @@ -89,6 +98,7 @@ public class HBaseWriteOptions implements Serializable { return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes && bufferFlushMaxRows == that.bufferFlushMaxRows && bufferFlushIntervalMillis == that.bufferFlushIntervalMillis + && ignoreNullValue == that.ignoreNullValue && parallelism == that.parallelism; } @@ -112,6 +122,7 @@ public class HBaseWriteOptions implements Serializable { private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; private long bufferFlushMaxRows = 0; private long bufferFlushIntervalMillis = 0; + private boolean ignoreNullValue; private Integer parallelism; /** @@ -141,6 +152,14 @@ public class HBaseWriteOptions implements Serializable { return this; } + /** + * Optional. Sets whether ignore null value or not. By defaults, null value will be writing. + */ + public Builder setIgnoreNullValue(boolean ignoreNullValue) { + this.ignoreNullValue = ignoreNullValue; + return this; + } + /** * Optional. Defines the parallelism of the HBase sink operator. By default, the parallelism * is determined by the framework using the same parallelism of the upstream chained @@ -157,6 +176,7 @@ public class HBaseWriteOptions implements Serializable { bufferFlushMaxSizeInBytes, bufferFlushMaxRows, bufferFlushIntervalMillis, + ignoreNullValue, parallelism); } } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java index 406a996..f07377c 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java @@ -34,16 +34,19 @@ public class RowDataToMutationConverter implements HBaseMutationConverter<RowDat private final HBaseTableSchema schema; private final String nullStringLiteral; + private final boolean ignoreNullValue; private transient HBaseSerde serde; - public RowDataToMutationConverter(HBaseTableSchema schema, final String nullStringLiteral) { + public RowDataToMutationConverter( + HBaseTableSchema schema, final String nullStringLiteral, boolean ignoreNullValue) { this.schema = schema; this.nullStringLiteral = nullStringLiteral; + this.ignoreNullValue = ignoreNullValue; } @Override public void open() { - this.serde = new HBaseSerde(schema, nullStringLiteral); + this.serde = new HBaseSerde(schema, nullStringLiteral, ignoreNullValue); } @Override diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java index 0c8dc97..d760c03 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java @@ -88,6 +88,12 @@ public class HBaseConnectorOptions { + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' " + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions."); + public static final ConfigOption<Boolean> SINK_IGNORE_NULL_VALUE = + ConfigOptions.key("sink.ignore-null-value") + .booleanType() + .defaultValue(false) + .withDescription("Writing option, whether ignore null value or not."); + public static final ConfigOption<Boolean> LOOKUP_ASYNC = ConfigOptions.key("lookup.async") .booleanType() diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java index 2141fe1..482644f 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java @@ -34,6 +34,7 @@ import java.util.Properties; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT; @@ -89,6 +90,7 @@ public class HBaseConnectorOptionsUtil { builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.setBufferFlushMaxSizeInBytes( tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes()); + builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE)); builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null)); return builder.build(); } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java index d21cc4a..458b25d 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java @@ -57,6 +57,8 @@ public class HBaseSerde { private final byte[] nullStringBytes; + private final boolean writeIgnoreNullValue; + // row key index in output row private final int rowkeyIndex; @@ -77,6 +79,13 @@ public class HBaseSerde { private final GenericRowData rowWithRowKey; public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) { + this(hbaseSchema, nullStringLiteral, false); + } + + public HBaseSerde( + HBaseTableSchema hbaseSchema, + final String nullStringLiteral, + boolean writeIgnoreNullValue) { this.families = hbaseSchema.getFamilyKeys(); this.rowkeyIndex = hbaseSchema.getRowKeyIndex(); LogicalType rowkeyType = @@ -93,6 +102,7 @@ public class HBaseSerde { this.keyDecoder = null; } this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8); + this.writeIgnoreNullValue = writeIgnoreNullValue; // prepare output rows this.reusedRow = new GenericRowData(fieldLength); @@ -141,6 +151,11 @@ public class HBaseSerde { byte[] familyKey = families[f]; RowData familyRow = row.getRow(i, qualifiers[f].length); for (int q = 0; q < this.qualifiers[f].length; q++) { + // ignore null value or not + if (writeIgnoreNullValue && familyRow.isNullAt(q)) { + continue; + } + // get quantifier key byte[] qualifier = qualifiers[f][q]; // serialize value diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java index 3e9afb1..e370809 100644 --- a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java +++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java @@ -19,11 +19,14 @@ package org.apache.flink.connector.hbase.util; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.DataType; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.junit.jupiter.api.Test; @@ -58,7 +61,7 @@ class HBaseSerdeTest { @Test void convertToNewRowTest() { - HBaseSerde serde = createHBaseSerde(); + HBaseSerde serde = createHBaseSerde(false); List<List<Cell>> cellsList = prepareCells(); List<RowData> resultRowDatas = new ArrayList<>(); List<String> resultRowDataStr = new ArrayList<>(); @@ -79,7 +82,7 @@ class HBaseSerdeTest { @Test void convertToReusedRowTest() { - HBaseSerde serde = createHBaseSerde(); + HBaseSerde serde = createHBaseSerde(false); List<List<Cell>> cellsList = prepareCells(); List<RowData> resultRowDatas = new ArrayList<>(); List<String> resultRowDataStr = new ArrayList<>(); @@ -99,7 +102,32 @@ class HBaseSerdeTest { "+I(2,+I(20),+I(Hello-2,200),+I(2.02,true,Welt-2))"); } - private HBaseSerde createHBaseSerde() { + @Test + public void writeIgnoreNullValueTest() { + HBaseSerde serde = createHBaseSerde(false); + Put m1 = serde.createPutMutation(prepareRowData()); + assert m1 != null; + assertThat(m1.getRow()).isNotEmpty(); + assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty(); + assertThat(m1.get(FAMILY2.getBytes(), F2COL1.getBytes())).isNotEmpty(); + assertThat(m1.get(FAMILY2.getBytes(), F2COL2.getBytes())).isNotEmpty(); + assertThat(m1.get(FAMILY3.getBytes(), F3COL1.getBytes())).isNotEmpty(); + assertThat(m1.get(FAMILY3.getBytes(), F3COL2.getBytes())).isNotEmpty(); + assertThat(m1.get(FAMILY3.getBytes(), F3COL3.getBytes())).isNotEmpty(); + + HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true); + Put m2 = writeIgnoreNullValueSerde.createPutMutation(prepareRowData()); + assert m2 != null; + assertThat(m2.getRow()).isNotEmpty(); + assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty(); + assertThat(m2.get(FAMILY2.getBytes(), F2COL1.getBytes())).isNotEmpty(); + assertThat(m2.get(FAMILY2.getBytes(), F2COL2.getBytes())).isEmpty(); + assertThat(m2.get(FAMILY3.getBytes(), F2COL1.getBytes())).isNotEmpty(); + assertThat(m2.get(FAMILY3.getBytes(), F3COL2.getBytes())).isNotEmpty(); + assertThat(m2.get(FAMILY3.getBytes(), F3COL3.getBytes())).isEmpty(); + } + + private HBaseTableSchema createHBaseTableSchema() { DataType dataType = ROW( FIELD(ROW_KEY, INT()), @@ -111,8 +139,11 @@ class HBaseSerdeTest { FIELD(F3COL1, DOUBLE()), FIELD(F3COL2, DataTypes.BOOLEAN()), FIELD(F3COL3, STRING())))); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType); - return new HBaseSerde(hbaseSchema, "null"); + return HBaseTableSchema.fromDataType(dataType); + } + + private HBaseSerde createHBaseSerde(boolean writeIgnoreNullValue) { + return new HBaseSerde(createHBaseTableSchema(), "null", writeIgnoreNullValue); } private List<List<Cell>> prepareCells() { @@ -163,4 +194,25 @@ class HBaseSerdeTest { cellList.add(cells2); return cellList; } + + private RowData prepareRowData() { + GenericRowData fam1Row = new GenericRowData(1); + fam1Row.setField(0, null); + + GenericRowData fam2Row = new GenericRowData(2); + fam2Row.setField(0, StringData.fromString("Hello-1")); + fam2Row.setField(1, null); + + GenericRowData fam3Row = new GenericRowData(3); + fam3Row.setField(0, 2.02); + fam3Row.setField(1, true); + fam3Row.setField(2, null); + + GenericRowData row = new GenericRowData(4); + row.setField(0, 10); + row.setField(1, fam1Row); + row.setField(2, fam2Row); + row.setField(3, fam3Row); + return row; + } }