This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push: new e0971c3 [FLINK-33304] Introduce the DeduplicatedMutator to resolve the mutation write conflict problem. This closes #30 e0971c3 is described below commit e0971c3888db03243b08e5684b7690150276ef2c Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com> AuthorDate: Fri Nov 3 23:12:14 2023 +0800 [FLINK-33304] Introduce the DeduplicatedMutator to resolve the mutation write conflict problem. This closes #30 Co-authored-by: tanjialiang <tanjiali...@52tt.com> --- .../connector/hbase1/HBaseConnectorITCase.java | 59 ++++++++++++++++++++++ .../flink/connector/hbase1/util/HBaseTestBase.java | 9 ++++ .../connector/hbase2/HBaseConnectorITCase.java | 57 +++++++++++++++++++++ .../flink/connector/hbase2/util/HBaseTestBase.java | 9 ++++ .../connector/hbase/sink/HBaseSinkFunction.java | 48 ++++++++++++++++-- 5 files changed, 179 insertions(+), 3 deletions(-) diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java index 35be358..0ea0002 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java @@ -29,10 +29,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CollectionUtil; import org.apache.hadoop.hbase.TableName; @@ -41,11 +44,14 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import static org.apache.flink.table.api.Expressions.$; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -280,6 +286,59 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, expected); } + @Test + public void testTableSinkWithChangelog() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + // register values table for source + String dataId = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")), + Row.ofKind(RowKind.DELETE, 1, Row.of("Hello2")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello1")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello2")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello3")), + Row.ofKind(RowKind.DELETE, 2, Row.of("Hello3")), + Row.ofKind(RowKind.INSERT, 1, Row.of("Hello3")))); + tEnv.executeSql( + "CREATE TABLE source_table (" + + " rowkey INT," + + " family1 ROW<name STRING>," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '" + + dataId + + "'," + + " 'changelog-mode'='I,UA,UB,D'" + + ")"); + + // register HBase table for sink + tEnv.executeSql( + "CREATE TABLE sink_table (" + + " rowkey INT," + + " family1 ROW<name STRING>," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + + TEST_TABLE_4 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table").await(); + + TableResult result = tEnv.executeSql("SELECT * FROM sink_table"); + + List<Row> actual = CollectionUtil.iteratorToList(result.collect()); + assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3")))); + } + @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java index cc2de79..86110c8 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java @@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_1 = "testTable1"; protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; + protected static final String TEST_TABLE_4 = "testTable4"; protected static final String ROW_KEY = "rowkey"; @@ -92,6 +93,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable1(); createHBaseTable2(); createHBaseTable3(); + createHBaseTable4(); } private static void createHBaseTable1() throws IOException { @@ -232,6 +234,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable4() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_4); + createTable(tableName, families, SPLIT_KEYS); + } + private static Put putRow( int rowKey, int f1c1, diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index c00b83a..64a9875 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; @@ -47,6 +48,8 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Spliterator; @@ -55,6 +58,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.table.api.Expressions.$; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -312,6 +316,59 @@ public class HBaseConnectorITCase extends HBaseTestBase { TestBaseUtils.compareResultAsText(results, String.join("", expected)); } + @Test + public void testTableSinkWithChangelog() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + // register values table for source + String dataId = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")), + Row.ofKind(RowKind.DELETE, 1, Row.of("Hello2")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello1")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello2")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello3")), + Row.ofKind(RowKind.DELETE, 2, Row.of("Hello3")), + Row.ofKind(RowKind.INSERT, 1, Row.of("Hello3")))); + tEnv.executeSql( + "CREATE TABLE source_table (" + + " rowkey INT," + + " family1 ROW<name STRING>," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '" + + dataId + + "'," + + " 'changelog-mode'='I,UA,UB,D'" + + ")"); + + // register HBase table for sink + tEnv.executeSql( + "CREATE TABLE sink_table (" + + " rowkey INT," + + " family1 ROW<name STRING>," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_4 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table").await(); + + TableResult result = tEnv.executeSql("SELECT * FROM sink_table"); + + List<Row> actual = CollectionUtil.iteratorToList(result.collect()); + assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3")))); + } + @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 1e639ba..1301ee1 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_1 = "testTable1"; protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; + protected static final String TEST_TABLE_4 = "testTable4"; protected static final String ROW_KEY = "rowkey"; @@ -92,6 +93,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createHBaseTable1(); createHBaseTable2(); createHBaseTable3(); + createHBaseTable4(); } private static void createHBaseTable1() throws IOException { @@ -232,6 +234,13 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable4() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_4); + createTable(tableName, families, SPLIT_KEYS); + } + private static Put putRow( int rowKey, int f1c1, diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java index 4dd30b3..0c4de1a 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java @@ -35,11 +35,16 @@ import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -71,7 +76,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> private final HBaseMutationConverter<T> mutationConverter; private transient Connection connection; - private transient BufferedMutator mutator; + private transient DeduplicatedMutator mutator; private transient ScheduledExecutorService executor; private transient ScheduledFuture scheduledFuture; @@ -121,7 +126,9 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> if (bufferFlushMaxSizeInBytes > 0) { params.writeBufferSize(bufferFlushMaxSizeInBytes); } - this.mutator = connection.getBufferedMutator(params); + this.mutator = + new DeduplicatedMutator( + (int) bufferFlushMaxMutations, connection.getBufferedMutator(params)); if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { this.executor = @@ -201,7 +208,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> } private void flush() throws IOException { - // BufferedMutator is thread-safe + // DeduplicatedMutator is thread-safe mutator.flush(); numPendingRequests.set(0); checkErrorAndRethrow(); @@ -256,4 +263,39 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, exception); } + + /** + * Thread-safe class, grouped mutations by rows and keep the latest mutation. For more info, see + * <a href="https://issues.apache.org/jira/browse/HBASE-8626">HBASE-8626</a>. + */ + private static class DeduplicatedMutator { + + private final BufferedMutator mutator; + private final Map<ByteBuffer, Mutation> mutations; + + DeduplicatedMutator(int size, BufferedMutator mutator) { + this.mutator = mutator; + this.mutations = new HashMap<>(size); + } + + synchronized void mutate(Mutation current) { + ByteBuffer key = ByteBuffer.wrap(current.getRow()); + Mutation old = mutations.get(key); + if (old == null || current.getTimeStamp() >= old.getTimeStamp()) { + mutations.put(key, current); + } + } + + synchronized void flush() throws IOException { + mutator.mutate(new ArrayList<>(mutations.values())); + mutator.flush(); + mutations.clear(); + } + + synchronized void close() throws IOException { + mutator.mutate(new ArrayList<>(mutations.values())); + mutator.close(); + mutations.clear(); + } + } }