This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/v3.0 by this push: new d35b703 [FLINK-30460][Connector/HBase] Support the writable metadata TTL. This closes #33 d35b703 is described below commit d35b703974a32cbb3a00d479ac186fcf73003a07 Author: Tan-JiaLiang <tanjialiang1...@gmail.com> AuthorDate: Thu Nov 16 23:57:00 2023 +0800 [FLINK-30460][Connector/HBase] Support the writable metadata TTL. This closes #33 (cherry picked from commit 334bd965d299aca2fa597f08bdc89e0275562419) --- docs/content.zh/docs/connectors/table/hbase.md | 6 ++ docs/content/docs/connectors/table/hbase.md | 6 ++ .../connector/hbase1/HBaseConnectorITCase.java | 69 ++++++++++++++++++++++ .../flink/connector/hbase1/util/HBaseTestBase.java | 9 +++ .../connector/hbase2/HBaseConnectorITCase.java | 69 ++++++++++++++++++++++ .../flink/connector/hbase2/util/HBaseTestBase.java | 9 +++ .../hbase/sink/RowDataToMutationConverter.java | 8 ++- .../connector/hbase/sink/WritableMetadata.java | 44 ++++++++++---- .../flink/connector/hbase/util/HBaseSerde.java | 5 +- .../flink/connector/hbase/util/HBaseSerdeTest.java | 4 +- 10 files changed, 212 insertions(+), 17 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/hbase.md b/docs/content.zh/docs/connectors/table/hbase.md index db68d37..8782e36 100644 --- a/docs/content.zh/docs/connectors/table/hbase.md +++ b/docs/content.zh/docs/connectors/table/hbase.md @@ -99,6 +99,12 @@ ON myTopic.key = hTable.rowkey; <td>HBase记录的时间戳。</td> <td><code>W</code></td> </tr> + <tr> + <td><code>ttl</code></td> + <td><code>BIGINT NOT NULL</code></td> + <td>HBase记录的生存时间(毫秒)。</td> + <td><code>W</code></td> + </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md index 8c723da..85371fa 100644 --- a/docs/content/docs/connectors/table/hbase.md +++ b/docs/content/docs/connectors/table/hbase.md @@ -101,6 +101,12 @@ Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT I <td>Timestamp for the HBase mutation.</td> <td><code>W</code></td> </tr> + <tr> + <td><code>ttl</code></td> + <td><code>BIGINT NOT NULL</code></td> + <td>Time-to-live for the HBase mutation, in milliseconds.</td> + <td><code>W</code></td> + </tr> </tbody> </table> diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java index 1c53187..e924d58 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java @@ -55,6 +55,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.flink.table.api.Expressions.$; @@ -422,6 +423,74 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, expected); } + @Test + public void testTableSinkWithTTLMetadata() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE hTableForSink (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW<col1 INT>," + + " ttl BIGINT NOT NULL METADATA FROM 'ttl'" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + + TEST_TABLE_6 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + String insert = + "INSERT INTO hTableForSink VALUES" + + "(1, ROW(1), 2000)," + + "(2, ROW(2), 9000)," + + "(3, ROW(3), 5000)"; + tEnv.executeSql(insert).await(); + + tEnv.executeSql( + "CREATE TABLE hTableForQuery (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW<col1 INT>" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + + TEST_TABLE_6 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; + + TableResult firstResult = tEnv.executeSql(query); + List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect()); + String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n"; + TestBaseUtils.compareResultAsText(firstResults, firstExpected); + + TimeUnit.SECONDS.sleep(3); + + TableResult secondResult = tEnv.executeSql(query); + List<Row> secondResults = CollectionUtil.iteratorToList(secondResult.collect()); + String secondExpected = "+I[2, 2]\n+I[3, 3]\n"; + TestBaseUtils.compareResultAsText(secondResults, secondExpected); + + TimeUnit.SECONDS.sleep(3); + + TableResult thirdResult = tEnv.executeSql(query); + List<Row> thirdResults = CollectionUtil.iteratorToList(thirdResult.collect()); + String thirdExpected = "+I[2, 2]"; + TestBaseUtils.compareResultAsText(thirdResults, thirdExpected); + + TimeUnit.SECONDS.sleep(4); + + TableResult lastResult = tEnv.executeSql(query); + List<Row> lastResults = CollectionUtil.iteratorToList(lastResult.collect()); + assertThat(lastResults).isEmpty(); + } + @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java index c3512a9..05e01e9 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java @@ -45,6 +45,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; protected static final String TEST_TABLE_5 = "testTable5"; + protected static final String TEST_TABLE_6 = "testTable6"; protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; @@ -98,6 +99,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable3(); createHBaseTable4(); createHBaseTable5(); + createHBaseTable6(); createEmptyHBaseTable(); } @@ -253,6 +255,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable6() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_6); + createTable(tableName, families, SPLIT_KEYS); + } + private static void createEmptyHBaseTable() { // create a table byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index c73bbc3..4bb6f2c 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -60,6 +60,7 @@ import java.util.Iterator; import java.util.List; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -451,6 +452,74 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, expected); } + @Test + public void testTableSinkWithTTLMetadata() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE hTableForSink (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW<col1 INT>," + + " ttl BIGINT NOT NULL METADATA FROM 'ttl'" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_6 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + String insert = + "INSERT INTO hTableForSink VALUES" + + "(1, ROW(1), 2000)," + + "(2, ROW(2), 9000)," + + "(3, ROW(3), 5000)"; + tEnv.executeSql(insert).await(); + + tEnv.executeSql( + "CREATE TABLE hTableForQuery (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW<col1 INT>" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_6 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; + + TableResult firstResult = tEnv.executeSql(query); + List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect()); + String firstExpected = "+I[1, 1]\n+I[2, 2]\n+I[3, 3]\n"; + TestBaseUtils.compareResultAsText(firstResults, firstExpected); + + TimeUnit.SECONDS.sleep(3); + + TableResult secondResult = tEnv.executeSql(query); + List<Row> secondResults = CollectionUtil.iteratorToList(secondResult.collect()); + String secondExpected = "+I[2, 2]\n+I[3, 3]\n"; + TestBaseUtils.compareResultAsText(secondResults, secondExpected); + + TimeUnit.SECONDS.sleep(3); + + TableResult thirdResult = tEnv.executeSql(query); + List<Row> thirdResults = CollectionUtil.iteratorToList(thirdResult.collect()); + String thirdExpected = "+I[2, 2]"; + TestBaseUtils.compareResultAsText(thirdResults, thirdExpected); + + TimeUnit.SECONDS.sleep(4); + + TableResult lastResult = tEnv.executeSql(query); + List<Row> lastResults = CollectionUtil.iteratorToList(lastResult.collect()); + assertThat(lastResults).isEmpty(); + } + @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index d6a0a9e..3434179 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -45,6 +45,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; protected static final String TEST_TABLE_5 = "testTable5"; + protected static final String TEST_TABLE_6 = "testTable6"; protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; @@ -98,6 +99,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable3(); createHBaseTable4(); createHBaseTable5(); + createHBaseTable6(); createEmptyHBaseTable(); } @@ -253,6 +255,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable6() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_6); + createTable(tableName, families, SPLIT_KEYS); + } + private static void createEmptyHBaseTable() { // create a table byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java index f9a13c8..5796d5c 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.hbase.sink; +import org.apache.flink.connector.hbase.sink.WritableMetadata.TimeToLiveMetadata; import org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata; import org.apache.flink.connector.hbase.util.HBaseSerde; import org.apache.flink.connector.hbase.util.HBaseTableSchema; @@ -40,6 +41,7 @@ public class RowDataToMutationConverter implements HBaseMutationConverter<RowDat private final String nullStringLiteral; private final boolean ignoreNullValue; private final TimestampMetadata timestampMetadata; + private final TimeToLiveMetadata timeToLiveMetadata; private transient HBaseSerde serde; public RowDataToMutationConverter( @@ -51,7 +53,8 @@ public class RowDataToMutationConverter implements HBaseMutationConverter<RowDat this.schema = schema; this.nullStringLiteral = nullStringLiteral; this.ignoreNullValue = ignoreNullValue; - this.timestampMetadata = TimestampMetadata.of(metadataKeys, physicalDataType); + this.timestampMetadata = new TimestampMetadata(metadataKeys, physicalDataType); + this.timeToLiveMetadata = new TimeToLiveMetadata(metadataKeys, physicalDataType); } @Override @@ -62,9 +65,10 @@ public class RowDataToMutationConverter implements HBaseMutationConverter<RowDat @Override public Mutation convertToMutation(RowData record) { Long timestamp = timestampMetadata.read(record); + Long timeToLive = timeToLiveMetadata.read(record); RowKind kind = record.getRowKind(); if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) { - return serde.createPutMutation(record, timestamp); + return serde.createPutMutation(record, timestamp, timeToLive); } else { return serde.createDeleteMutation(record, timestamp); } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java index c7e9e98..9b3ab36 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java @@ -35,6 +35,8 @@ public abstract class WritableMetadata<T> implements Serializable { private static final long serialVersionUID = 1L; + public abstract T read(RowData row); + /** * Returns the map of metadata keys and their corresponding data types that can be consumed by * HBase sink for writing. @@ -44,10 +46,16 @@ public abstract class WritableMetadata<T> implements Serializable { public static Map<String, DataType> list() { Map<String, DataType> metadataMap = new HashMap<>(); metadataMap.put(TimestampMetadata.KEY, TimestampMetadata.DATA_TYPE); + metadataMap.put(TimeToLiveMetadata.KEY, TimeToLiveMetadata.DATA_TYPE); return Collections.unmodifiableMap(metadataMap); } - public abstract T read(RowData row); + private static void validateNotNull(RowData row, int pos, String key) { + if (row.isNullAt(pos)) { + throw new IllegalArgumentException( + String.format("Writable metadata '%s' can not accept null value", key)); + } + } /** Timestamp metadata for HBase. */ public static class TimestampMetadata extends WritableMetadata<Long> { @@ -58,8 +66,9 @@ public abstract class WritableMetadata<T> implements Serializable { private final int pos; - public TimestampMetadata(int pos) { - this.pos = pos; + public TimestampMetadata(List<String> metadataKeys, DataType physicalDataType) { + int idx = metadataKeys.indexOf(KEY); + this.pos = idx < 0 ? -1 : idx + physicalDataType.getLogicalType().getChildren().size(); } @Override @@ -67,20 +76,31 @@ public abstract class WritableMetadata<T> implements Serializable { if (pos < 0) { return HConstants.LATEST_TIMESTAMP; } - if (row.isNullAt(pos)) { - throw new IllegalArgumentException( - String.format("Writable metadata '%s' can not accept null value", KEY)); - } + validateNotNull(row, pos, KEY); return row.getTimestamp(pos, 3).getMillisecond(); } + } + + /** Time-to-live metadata for HBase. */ + public static class TimeToLiveMetadata extends WritableMetadata<Long> { - public static TimestampMetadata of(List<String> metadataKeys, DataType physicalDataType) { - int pos = metadataKeys.indexOf(TimestampMetadata.KEY); + public static final String KEY = "ttl"; + public static final DataType DATA_TYPE = DataTypes.BIGINT().nullable(); + + private final int pos; + + public TimeToLiveMetadata(List<String> metadataKeys, DataType physicalDataType) { + int idx = metadataKeys.indexOf(KEY); + this.pos = idx < 0 ? -1 : idx + physicalDataType.getLogicalType().getChildren().size(); + } + + @Override + public Long read(RowData row) { if (pos < 0) { - return new TimestampMetadata(-1); + return null; } - return new TimestampMetadata( - pos + physicalDataType.getLogicalType().getChildren().size()); + validateNotNull(row, pos, KEY); + return row.getLong(pos); } } } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java index d381033..362dc96 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java @@ -135,7 +135,7 @@ public class HBaseSerde { * * @return The appropriate instance of Put for this use case. */ - public @Nullable Put createPutMutation(RowData row, long timestamp) { + public @Nullable Put createPutMutation(RowData row, long timestamp, @Nullable Long timeToLive) { checkArgument(keyEncoder != null, "row key is not set."); byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); if (rowkey.length == 0) { @@ -144,6 +144,9 @@ public class HBaseSerde { } // upsert Put put = new Put(rowkey, timestamp); + if (timeToLive != null) { + put.setTTL(timeToLive); + } for (int i = 0; i < fieldLength; i++) { if (i != rowkeyIndex) { int f = i > rowkeyIndex ? i - 1 : i; diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java index 85de7c8..7c46d04 100644 --- a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java +++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java @@ -106,7 +106,7 @@ class HBaseSerdeTest { @Test public void writeIgnoreNullValueTest() { HBaseSerde serde = createHBaseSerde(false); - Put m1 = serde.createPutMutation(prepareRowData(), HConstants.LATEST_TIMESTAMP); + Put m1 = serde.createPutMutation(prepareRowData(), HConstants.LATEST_TIMESTAMP, null); assert m1 != null; assertThat(m1.getRow()).isNotEmpty(); assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty(); @@ -119,7 +119,7 @@ class HBaseSerdeTest { HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true); Put m2 = writeIgnoreNullValueSerde.createPutMutation( - prepareRowData(), HConstants.LATEST_TIMESTAMP); + prepareRowData(), HConstants.LATEST_TIMESTAMP, null); assert m2 != null; assertThat(m2.getRow()).isNotEmpty(); assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();