This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git


The following commit(s) were added to refs/heads/main by this push:
     new e0971c3  [FLINK-33304] Introduce the DeduplicatedMutator to resolve 
the mutation write conflict problem. This closes #30
e0971c3 is described below

commit e0971c3888db03243b08e5684b7690150276ef2c
Author: Tan-JiaLiang <55388933+tan-jiali...@users.noreply.github.com>
AuthorDate: Fri Nov 3 23:12:14 2023 +0800

    [FLINK-33304] Introduce the DeduplicatedMutator to resolve the mutation 
write conflict problem. This closes #30
    
    Co-authored-by: tanjialiang <tanjiali...@52tt.com>
---
 .../connector/hbase1/HBaseConnectorITCase.java     | 59 ++++++++++++++++++++++
 .../flink/connector/hbase1/util/HBaseTestBase.java |  9 ++++
 .../connector/hbase2/HBaseConnectorITCase.java     | 57 +++++++++++++++++++++
 .../flink/connector/hbase2/util/HBaseTestBase.java |  9 ++++
 .../connector/hbase/sink/HBaseSinkFunction.java    | 48 ++++++++++++++++--
 5 files changed, 179 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 35be358..0ea0002 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -29,10 +29,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.hadoop.hbase.TableName;
@@ -41,11 +44,14 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -280,6 +286,59 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
+    @Test
+    public void testTableSinkWithChangelog() throws Exception {
+        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
+
+        // register values table for source
+        String dataId =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(
+                                Row.ofKind(RowKind.INSERT, 1, 
Row.of("Hello1")),
+                                Row.ofKind(RowKind.DELETE, 1, 
Row.of("Hello2")),
+                                Row.ofKind(RowKind.INSERT, 2, 
Row.of("Hello1")),
+                                Row.ofKind(RowKind.INSERT, 2, 
Row.of("Hello2")),
+                                Row.ofKind(RowKind.INSERT, 2, 
Row.of("Hello3")),
+                                Row.ofKind(RowKind.DELETE, 2, 
Row.of("Hello3")),
+                                Row.ofKind(RowKind.INSERT, 1, 
Row.of("Hello3"))));
+        tEnv.executeSql(
+                "CREATE TABLE source_table ("
+                        + " rowkey INT,"
+                        + " family1 ROW<name STRING>,"
+                        + " PRIMARY KEY (rowkey) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'data-id' = '"
+                        + dataId
+                        + "',"
+                        + " 'changelog-mode'='I,UA,UB,D'"
+                        + ")");
+
+        // register HBase table for sink
+        tEnv.executeSql(
+                "CREATE TABLE sink_table ("
+                        + " rowkey INT,"
+                        + " family1 ROW<name STRING>,"
+                        + " PRIMARY KEY (rowkey) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-1.4',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_4
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        tEnv.executeSql("INSERT INTO sink_table SELECT * FROM 
source_table").await();
+
+        TableResult result = tEnv.executeSql("SELECT * FROM sink_table");
+
+        List<Row> actual = CollectionUtil.iteratorToList(result.collect());
+        assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, 
Row.of("Hello3"))));
+    }
+
     @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index cc2de79..86110c8 100644
--- 
a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_1 = "testTable1";
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
+    protected static final String TEST_TABLE_4 = "testTable4";
 
     protected static final String ROW_KEY = "rowkey";
 
@@ -92,6 +93,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable1();
         createHBaseTable2();
         createHBaseTable3();
+        createHBaseTable4();
     }
 
     private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable4() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_4);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static Put putRow(
             int rowKey,
             int f1c1,
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index c00b83a..64a9875 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
@@ -47,6 +48,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Spliterator;
@@ -55,6 +58,7 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.table.api.Expressions.$;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -312,6 +316,59 @@ public class HBaseConnectorITCase extends HBaseTestBase {
         TestBaseUtils.compareResultAsText(results, String.join("", expected));
     }
 
+    @Test
+    public void testTableSinkWithChangelog() throws Exception {
+        StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
streamSettings);
+
+        // register values table for source
+        String dataId =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(
+                                Row.ofKind(RowKind.INSERT, 1, 
Row.of("Hello1")),
+                                Row.ofKind(RowKind.DELETE, 1, 
Row.of("Hello2")),
+                                Row.ofKind(RowKind.INSERT, 2, 
Row.of("Hello1")),
+                                Row.ofKind(RowKind.INSERT, 2, 
Row.of("Hello2")),
+                                Row.ofKind(RowKind.INSERT, 2, 
Row.of("Hello3")),
+                                Row.ofKind(RowKind.DELETE, 2, 
Row.of("Hello3")),
+                                Row.ofKind(RowKind.INSERT, 1, 
Row.of("Hello3"))));
+        tEnv.executeSql(
+                "CREATE TABLE source_table ("
+                        + " rowkey INT,"
+                        + " family1 ROW<name STRING>,"
+                        + " PRIMARY KEY (rowkey) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'data-id' = '"
+                        + dataId
+                        + "',"
+                        + " 'changelog-mode'='I,UA,UB,D'"
+                        + ")");
+
+        // register HBase table for sink
+        tEnv.executeSql(
+                "CREATE TABLE sink_table ("
+                        + " rowkey INT,"
+                        + " family1 ROW<name STRING>,"
+                        + " PRIMARY KEY (rowkey) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_4
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        tEnv.executeSql("INSERT INTO sink_table SELECT * FROM 
source_table").await();
+
+        TableResult result = tEnv.executeSql("SELECT * FROM sink_table");
+
+        List<Row> actual = CollectionUtil.iteratorToList(result.collect());
+        assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, 
Row.of("Hello3"))));
+    }
+
     @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1e639ba..1301ee1 100644
--- 
a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ 
b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -43,6 +43,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
     protected static final String TEST_TABLE_1 = "testTable1";
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
+    protected static final String TEST_TABLE_4 = "testTable4";
 
     protected static final String ROW_KEY = "rowkey";
 
@@ -92,6 +93,7 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createHBaseTable1();
         createHBaseTable2();
         createHBaseTable3();
+        createHBaseTable4();
     }
 
     private static void createHBaseTable1() throws IOException {
@@ -232,6 +234,13 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable4() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_4);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static Put putRow(
             int rowKey,
             int f1c1,
diff --git 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 4dd30b3..0c4de1a 100644
--- 
a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ 
b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -35,11 +35,16 @@ import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -71,7 +76,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
     private final HBaseMutationConverter<T> mutationConverter;
 
     private transient Connection connection;
-    private transient BufferedMutator mutator;
+    private transient DeduplicatedMutator mutator;
 
     private transient ScheduledExecutorService executor;
     private transient ScheduledFuture scheduledFuture;
@@ -121,7 +126,9 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             if (bufferFlushMaxSizeInBytes > 0) {
                 params.writeBufferSize(bufferFlushMaxSizeInBytes);
             }
-            this.mutator = connection.getBufferedMutator(params);
+            this.mutator =
+                    new DeduplicatedMutator(
+                            (int) bufferFlushMaxMutations, 
connection.getBufferedMutator(params));
 
             if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) 
{
                 this.executor =
@@ -201,7 +208,7 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
     }
 
     private void flush() throws IOException {
-        // BufferedMutator is thread-safe
+        // DeduplicatedMutator is thread-safe
         mutator.flush();
         numPendingRequests.set(0);
         checkErrorAndRethrow();
@@ -256,4 +263,39 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
         // if the failure handler decides to throw an exception
         failureThrowable.compareAndSet(null, exception);
     }
+
+    /**
+     * Thread-safe class, grouped mutations by rows and keep the latest 
mutation. For more info, see
+     * <a 
href="https://issues.apache.org/jira/browse/HBASE-8626";>HBASE-8626</a>.
+     */
+    private static class DeduplicatedMutator {
+
+        private final BufferedMutator mutator;
+        private final Map<ByteBuffer, Mutation> mutations;
+
+        DeduplicatedMutator(int size, BufferedMutator mutator) {
+            this.mutator = mutator;
+            this.mutations = new HashMap<>(size);
+        }
+
+        synchronized void mutate(Mutation current) {
+            ByteBuffer key = ByteBuffer.wrap(current.getRow());
+            Mutation old = mutations.get(key);
+            if (old == null || current.getTimeStamp() >= old.getTimeStamp()) {
+                mutations.put(key, current);
+            }
+        }
+
+        synchronized void flush() throws IOException {
+            mutator.mutate(new ArrayList<>(mutations.values()));
+            mutator.flush();
+            mutations.clear();
+        }
+
+        synchronized void close() throws IOException {
+            mutator.mutate(new ArrayList<>(mutations.values()));
+            mutator.close();
+            mutations.clear();
+        }
+    }
 }

Reply via email to