This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new d35b703  [FLINK-30460][Connector/HBase] Support the writable metadata 
TTL. This closes #33
d35b703 is described below

commit d35b703974a32cbb3a00d479ac186fcf73003a07
Author: Tan-JiaLiang <tanjialiang1...@gmail.com>
AuthorDate: Thu Nov 16 23:57:00 2023 +0800

    [FLINK-30460][Connector/HBase] Support the writable metadata TTL. This 
closes #33
    
    (cherry picked from commit 334bd965d299aca2fa597f08bdc89e0275562419)
---
 docs/content.zh/docs/connectors/table/hbase.md     |  6 ++
 docs/content/docs/connectors/table/hbase.md        |  6 ++
 .../connector/hbase1/HBaseConnectorITCase.java     | 69 ++++++++++++++++++++++
 .../flink/connector/hbase1/util/HBaseTestBase.java |  9 +++
 .../connector/hbase2/HBaseConnectorITCase.java     | 69 ++++++++++++++++++++++
 .../flink/connector/hbase2/util/HBaseTestBase.java |  9 +++
 .../hbase/sink/RowDataToMutationConverter.java     |  8 ++-
 .../connector/hbase/sink/WritableMetadata.java     | 44 ++++++++++----
 .../flink/connector/hbase/util/HBaseSerde.java     |  5 +-
 .../flink/connector/hbase/util/HBaseSerdeTest.java |  4 +-
 10 files changed, 212 insertions(+), 17 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/hbase.md 
b/docs/content.zh/docs/connectors/table/hbase.md
index db68d37..8782e36 100644
--- a/docs/content.zh/docs/connectors/table/hbase.md
+++ b/docs/content.zh/docs/connectors/table/hbase.md
@@ -99,6 +99,12 @@ ON myTopic.key = hTable.rowkey;
       <td>HBase记录的时间戳。</td>
       <td><code>W</code></td>
     </tr>
+    <tr>
+      <td><code>ttl</code></td>
+      <td><code>BIGINT NOT NULL</code></td>
+      <td>HBase记录的生存时间(毫秒)。</td>
+      <td><code>W</code></td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/hbase.md 
b/docs/content/docs/connectors/table/hbase.md
index 8c723da..85371fa 100644
--- a/docs/content/docs/connectors/table/hbase.md
+++ b/docs/content/docs/connectors/table/hbase.md
@@ -101,6 +101,12 @@ Read-only columns must be declared `VIRTUAL` to exclude 
them during an `INSERT I
       <td>Timestamp for the HBase mutation.</td>
       <td><code>W</code></td>
     </tr>
+    <tr>
+      <td><code>ttl</code></td>
+      <td><code>BIGINT NOT NULL</code></td>
+      <td>Time-to-live for the HBase mutation, in milliseconds.</td>
+      <td><code>W</code></td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 1c53187..e924d58 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -55,6 +55,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
@@ -422,6 +423,74 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
+    @Test
+    public void testTableSinkWithTTLMetadata() throws Exception {
+        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForSink ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>,"
+                        + " ttl BIGINT NOT NULL METADATA FROM 'ttl'"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-1.4',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_6
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        String insert =
+                "INSERT INTO hTableForSink VALUES"
+                        + "(1, ROW(1), 2000),"
+                        + "(2, ROW(2), 9000),"
+                        + "(3, ROW(3), 5000)";
+        tEnv.executeSql(insert).await();
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForQuery ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-1.4',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_6
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+        String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+        TableResult firstResult = tEnv.executeSql(query);
+        List<Row> firstResults = 
CollectionUtil.iteratorToList(firstResult.collect());
+        String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
+        TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TableResult secondResult = tEnv.executeSql(query);
+        List<Row> secondResults = 
CollectionUtil.iteratorToList(secondResult.collect());
+        String secondExpected = "+I[2, 2]\n+I[3, 3]\n";
+        TestBaseUtils.compareResultAsText(secondResults, secondExpected);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TableResult thirdResult = tEnv.executeSql(query);
+        List<Row> thirdResults = 
CollectionUtil.iteratorToList(thirdResult.collect());
+        String thirdExpected = "+I[2, 2]";
+        TestBaseUtils.compareResultAsText(thirdResults, thirdExpected);
+
+        TimeUnit.SECONDS.sleep(4);
+
+        TableResult lastResult = tEnv.executeSql(query);
+        List<Row> lastResults = 
CollectionUtil.iteratorToList(lastResult.collect());
+        assertThat(lastResults).isEmpty();
+    }
+
     @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index c3512a9..05e01e9 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -45,6 +45,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
     protected static final String TEST_TABLE_5 = "testTable5";
+    protected static final String TEST_TABLE_6 = "testTable6";
     protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
@@ -98,6 +99,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable3();
         createHBaseTable4();
         createHBaseTable5();
+        createHBaseTable6();
         createEmptyHBaseTable();
     }
 
@@ -253,6 +255,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable6() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_6);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static void createEmptyHBaseTable() {
         // create a table
         byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index c73bbc3..4bb6f2c 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -60,6 +60,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Spliterator;
 import java.util.Spliterators;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -451,6 +452,74 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
+    @Test
+    public void testTableSinkWithTTLMetadata() throws Exception {
+        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForSink ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>,"
+                        + " ttl BIGINT NOT NULL METADATA FROM 'ttl'"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_6
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        String insert =
+                "INSERT INTO hTableForSink VALUES"
+                        + "(1, ROW(1), 2000),"
+                        + "(2, ROW(2), 9000),"
+                        + "(3, ROW(3), 5000)";
+        tEnv.executeSql(insert).await();
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForQuery ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_6
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+        String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
+
+        TableResult firstResult = tEnv.executeSql(query);
+        List<Row> firstResults = 
CollectionUtil.iteratorToList(firstResult.collect());
+        String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n";
+        TestBaseUtils.compareResultAsText(firstResults, firstExpected);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TableResult secondResult = tEnv.executeSql(query);
+        List<Row> secondResults = 
CollectionUtil.iteratorToList(secondResult.collect());
+        String secondExpected = "+I[2, 2]\n+I[3, 3]\n";
+        TestBaseUtils.compareResultAsText(secondResults, secondExpected);
+
+        TimeUnit.SECONDS.sleep(3);
+
+        TableResult thirdResult = tEnv.executeSql(query);
+        List<Row> thirdResults = 
CollectionUtil.iteratorToList(thirdResult.collect());
+        String thirdExpected = "+I[2, 2]";
+        TestBaseUtils.compareResultAsText(thirdResults, thirdExpected);
+
+        TimeUnit.SECONDS.sleep(4);
+
+        TableResult lastResult = tEnv.executeSql(query);
+        List<Row> lastResults = 
CollectionUtil.iteratorToList(lastResult.collect());
+        assertThat(lastResults).isEmpty();
+    }
+
     @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index d6a0a9e..3434179 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -45,6 +45,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
     protected static final String TEST_TABLE_5 = "testTable5";
+    protected static final String TEST_TABLE_6 = "testTable6";
     protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
@@ -98,6 +99,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable3();
         createHBaseTable4();
         createHBaseTable5();
+        createHBaseTable6();
         createEmptyHBaseTable();
     }
 
@@ -253,6 +255,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable6() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_6);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static void createEmptyHBaseTable() {
         // create a table
         byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
index f9a13c8..5796d5c 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.hbase.sink;
 
+import 
org.apache.flink.connector.hbase.sink.WritableMetadata.TimeToLiveMetadata;
 import 
org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata;
 import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
@@ -40,6 +41,7 @@ public class RowDataToMutationConverter implements 
HBaseMutationConverter<RowDat
     private final String nullStringLiteral;
     private final boolean ignoreNullValue;
     private final TimestampMetadata timestampMetadata;
+    private final TimeToLiveMetadata timeToLiveMetadata;
     private transient HBaseSerde serde;
 
     public RowDataToMutationConverter(
@@ -51,7 +53,8 @@ public class RowDataToMutationConverter implements 
HBaseMutationConverter<RowDat
         this.schema = schema;
         this.nullStringLiteral = nullStringLiteral;
         this.ignoreNullValue = ignoreNullValue;
-        this.timestampMetadata = TimestampMetadata.of(metadataKeys, 
physicalDataType);
+        this.timestampMetadata = new TimestampMetadata(metadataKeys, 
physicalDataType);
+        this.timeToLiveMetadata = new TimeToLiveMetadata(metadataKeys, 
physicalDataType);
     }
 
     @Override
@@ -62,9 +65,10 @@ public class RowDataToMutationConverter implements 
HBaseMutationConverter<RowDat
     @Override
     public Mutation convertToMutation(RowData record) {
         Long timestamp = timestampMetadata.read(record);
+        Long timeToLive = timeToLiveMetadata.read(record);
         RowKind kind = record.getRowKind();
         if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
-            return serde.createPutMutation(record, timestamp);
+            return serde.createPutMutation(record, timestamp, timeToLive);
         } else {
             return serde.createDeleteMutation(record, timestamp);
         }
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
index c7e9e98..9b3ab36 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
@@ -35,6 +35,8 @@ public abstract class WritableMetadata<T> implements 
Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    public abstract T read(RowData row);
+
     /**
      * Returns the map of metadata keys and their corresponding data types 
that can be consumed by
      * HBase sink for writing.
@@ -44,10 +46,16 @@ public abstract class WritableMetadata<T> implements 
Serializable {
     public static Map<String, DataType> list() {
         Map<String, DataType> metadataMap = new HashMap<>();
         metadataMap.put(TimestampMetadata.KEY, TimestampMetadata.DATA_TYPE);
+        metadataMap.put(TimeToLiveMetadata.KEY, TimeToLiveMetadata.DATA_TYPE);
         return Collections.unmodifiableMap(metadataMap);
     }
 
-    public abstract T read(RowData row);
+    private static void validateNotNull(RowData row, int pos, String key) {
+        if (row.isNullAt(pos)) {
+            throw new IllegalArgumentException(
+                    String.format("Writable metadata '%s' can not accept null 
value", key));
+        }
+    }
 
     /** Timestamp metadata for HBase. */
     public static class TimestampMetadata extends WritableMetadata<Long> {
@@ -58,8 +66,9 @@ public abstract class WritableMetadata<T> implements 
Serializable {
 
         private final int pos;
 
-        public TimestampMetadata(int pos) {
-            this.pos = pos;
+        public TimestampMetadata(List<String> metadataKeys, DataType 
physicalDataType) {
+            int idx = metadataKeys.indexOf(KEY);
+            this.pos = idx < 0 ? -1 : idx + 
physicalDataType.getLogicalType().getChildren().size();
         }
 
         @Override
@@ -67,20 +76,31 @@ public abstract class WritableMetadata<T> implements 
Serializable {
             if (pos < 0) {
                 return HConstants.LATEST_TIMESTAMP;
             }
-            if (row.isNullAt(pos)) {
-                throw new IllegalArgumentException(
-                        String.format("Writable metadata '%s' can not accept 
null value", KEY));
-            }
+            validateNotNull(row, pos, KEY);
             return row.getTimestamp(pos, 3).getMillisecond();
         }
+    }
+
+    /** Time-to-live metadata for HBase. */
+    public static class TimeToLiveMetadata extends WritableMetadata<Long> {
 
-        public static TimestampMetadata of(List<String> metadataKeys, DataType 
physicalDataType) {
-            int pos = metadataKeys.indexOf(TimestampMetadata.KEY);
+        public static final String KEY = "ttl";
+        public static final DataType DATA_TYPE = DataTypes.BIGINT().nullable();
+
+        private final int pos;
+
+        public TimeToLiveMetadata(List<String> metadataKeys, DataType 
physicalDataType) {
+            int idx = metadataKeys.indexOf(KEY);
+            this.pos = idx < 0 ? -1 : idx + 
physicalDataType.getLogicalType().getChildren().size();
+        }
+
+        @Override
+        public Long read(RowData row) {
             if (pos < 0) {
-                return new TimestampMetadata(-1);
+                return null;
             }
-            return new TimestampMetadata(
-                    pos + 
physicalDataType.getLogicalType().getChildren().size());
+            validateNotNull(row, pos, KEY);
+            return row.getLong(pos);
         }
     }
 }
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index d381033..362dc96 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -135,7 +135,7 @@ public class HBaseSerde {
      *
      * @return The appropriate instance of Put for this use case.
      */
-    public @Nullable Put createPutMutation(RowData row, long timestamp) {
+    public @Nullable Put createPutMutation(RowData row, long timestamp, 
@Nullable Long timeToLive) {
         checkArgument(keyEncoder != null, "row key is not set.");
         byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
         if (rowkey.length == 0) {
@@ -144,6 +144,9 @@ public class HBaseSerde {
         }
         // upsert
         Put put = new Put(rowkey, timestamp);
+        if (timeToLive != null) {
+            put.setTTL(timeToLive);
+        }
         for (int i = 0; i < fieldLength; i++) {
             if (i != rowkeyIndex) {
                 int f = i > rowkeyIndex ? i - 1 : i;
diff --git 
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
 
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index 85de7c8..7c46d04 100644
--- 
a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++ 
b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -106,7 +106,7 @@ class HBaseSerdeTest {
     @Test
     public void writeIgnoreNullValueTest() {
         HBaseSerde serde = createHBaseSerde(false);
-        Put m1 = serde.createPutMutation(prepareRowData(), 
HConstants.LATEST_TIMESTAMP);
+        Put m1 = serde.createPutMutation(prepareRowData(), 
HConstants.LATEST_TIMESTAMP, null);
         assert m1 != null;
         assertThat(m1.getRow()).isNotEmpty();
         assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty();
@@ -119,7 +119,7 @@ class HBaseSerdeTest {
         HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true);
         Put m2 =
                 writeIgnoreNullValueSerde.createPutMutation(
-                        prepareRowData(), HConstants.LATEST_TIMESTAMP);
+                        prepareRowData(), HConstants.LATEST_TIMESTAMP, null);
         assert m2 != null;
         assertThat(m2.getRow()).isNotEmpty();
         assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();

Reply via email to