This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git

commit 6ac4a3faea7596b91432c1213a5487916c9a7e63
Author: Miklos Gergely <mgerg...@cloudera.com>
AuthorDate: Tue Aug 25 13:19:04 2020 +0200

    [FLINK-18795][hbase] Support for HBase 2
    
    This closes #13128
---
 flink-connector-hbase-e2e-tests/pom.xml            | 20 +++++++++--
 .../flink/tests/util/hbase/HBaseResource.java      |  5 +--
 .../tests/util/hbase/HBaseResourceFactory.java     |  3 +-
 .../util/hbase/LocalStandaloneHBaseResource.java   |  8 +++--
 .../hbase/LocalStandaloneHBaseResourceFactory.java |  4 +--
 .../tests/util/hbase/SQLClientHBaseITCase.java     | 42 ++++++++++++++++++----
 .../src/test/resources/hbase_e2e.sql               |  4 +--
 7 files changed, 67 insertions(+), 19 deletions(-)

diff --git a/flink-connector-hbase-e2e-tests/pom.xml 
b/flink-connector-hbase-e2e-tests/pom.xml
index c280259..4875e6f 100644
--- a/flink-connector-hbase-e2e-tests/pom.xml
+++ b/flink-connector-hbase-e2e-tests/pom.xml
@@ -49,7 +49,13 @@ under the License.
                <!--using hbase shade jar to execute end-to-end test-->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-sql-connector-hbase_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-sql-connector-hbase-1.4_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-sql-connector-hbase-2.2_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
@@ -186,9 +192,17 @@ under the License.
                                                                </artifactItem>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
-                                                                       
<artifactId>flink-sql-connector-hbase_${scala.binary.version}</artifactId>
+                                                                       
<artifactId>flink-sql-connector-hbase-1.4_${scala.binary.version}</artifactId>
+                                                                       
<version>${project.version}</version>
+                                                                       
<destFileName>sql-hbase-1.4.jar</destFileName>
+                                                                       
<type>jar</type>
+                                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                                               </artifactItem>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.flink</groupId>
+                                                                       
<artifactId>flink-sql-connector-hbase-2.2_${scala.binary.version}</artifactId>
                                                                        
<version>${project.version}</version>
-                                                                       
<destFileName>sql-hbase.jar</destFileName>
+                                                                       
<destFileName>sql-hbase-2.2.jar</destFileName>
                                                                        
<type>jar</type>
                                                                        
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                                                                </artifactItem>
diff --git 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
index 4b9e091..83bf249 100644
--- 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
+++ 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java
@@ -61,12 +61,13 @@ public interface HBaseResource extends ExternalResource {
        /**
         * Returns the configured HBaseResource implementation, or a {@link 
LocalStandaloneHBaseResource} if none is configured.
         *
+        * @param version The hbase version
         * @return configured HbaseResource, or {@link 
LocalStandaloneHBaseResource} if none is configured
         */
-       static HBaseResource get() {
+       static HBaseResource get(String version) {
                return FactoryUtils.loadAndInvokeFactory(
                        HBaseResourceFactory.class,
-                       HBaseResourceFactory::create,
+                       factory -> factory.create(version),
                        LocalStandaloneHBaseResourceFactory::new);
        }
 }
diff --git 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
index bd873fa..27b3864 100644
--- 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
+++ 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java
@@ -30,8 +30,9 @@ public interface HBaseResourceFactory {
         * Returns a {@link HBaseResource} instance. If the instance could not 
be instantiated (for example, because a
         * mandatory parameter was missing), then an empty {@link Optional} 
should be returned.
         *
+        * @param version The hbase version
         * @return HBaseResource instance
         * @throws Exception if the instance could not be instantiated
         */
-       HBaseResource create() throws Exception;
+       HBaseResource create(String version) throws Exception;
 }
diff --git 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
index c1b6b7b..2478432 100644
--- 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
+++ 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java
@@ -49,16 +49,18 @@ public class LocalStandaloneHBaseResource implements 
HBaseResource {
        private final TemporaryFolder tmp = new TemporaryFolder();
 
        private final DownloadCache downloadCache = DownloadCache.get();
+       private final String hbaseVersion;
        private Path hbaseDir;
 
-       LocalStandaloneHBaseResource() {
+       LocalStandaloneHBaseResource(String hbaseVersion) {
                OperatingSystemRestriction.forbid(
                        String.format("The %s relies on UNIX utils and shell 
scripts.", getClass().getSimpleName()),
                        OperatingSystem.WINDOWS);
+               this.hbaseVersion = hbaseVersion;
        }
 
-       private static String getHBaseDownloadUrl() {
-               return 
"https://archive.apache.org/dist/hbase/1.4.3/hbase-1.4.3-bin.tar.gz";;
+       private String getHBaseDownloadUrl() {
+               return 
String.format("https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz";,
 hbaseVersion);
        }
 
        @Override
diff --git 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
index 9eb1ab9..2e57f0f 100644
--- 
a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
+++ 
b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java
@@ -24,7 +24,7 @@ package org.apache.flink.tests.util.hbase;
 public class LocalStandaloneHBaseResourceFactory implements 
HBaseResourceFactory {
 
        @Override
-       public HBaseResource create() {
-               return new LocalStandaloneHBaseResource();
+       public HBaseResource create(String version) {
+               return new LocalStandaloneHBaseResource(version);
        }
 }
diff --git 
a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
 
b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
index ad5f4f7..ca603f4 100644
--- 
a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
+++ 
b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java
@@ -40,6 +40,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,8 +53,12 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
@@ -62,6 +68,7 @@ import static org.junit.Assert.assertThat;
 /**
  * End-to-end test for the HBase connectors.
  */
+@RunWith(Parameterized.class)
 @Category(value = {TravisGroup1.class, PreCommit.class, FailsOnJava11.class})
 public class SQLClientHBaseITCase extends TestLogger {
 
@@ -69,6 +76,14 @@ public class SQLClientHBaseITCase extends TestLogger {
 
        private static final String HBASE_E2E_SQL = "hbase_e2e.sql";
 
+       @Parameterized.Parameters(name = "{index}: hbase-version:{0}")
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][]{
+                               {"1.4.3", "hbase-1.4"},
+                               {"2.2.3", "hbase-2.2"}
+               });
+       }
+
        @Rule
        public final HBaseResource hbase;
 
@@ -79,16 +94,20 @@ public class SQLClientHBaseITCase extends TestLogger {
        @Rule
        public final TemporaryFolder tmp = new TemporaryFolder();
 
+       private final String hbaseConnector;
+       private final Path sqlConnectorHBaseJar;
+
        @ClassRule
        public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();
 
        private static final Path sqlToolBoxJar = 
TestUtils.getResource(".*SqlToolbox.jar");
-       private static final Path sqlConnectorHBaseJar = 
TestUtils.getResource(".*hbase.jar");
        private static final Path hadoopClasspath = 
TestUtils.getResource(".*hadoop.classpath");
        private List<Path> hadoopClasspathJars;
 
-       public SQLClientHBaseITCase() {
-               this.hbase = HBaseResource.get();
+       public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) 
{
+               this.hbase = HBaseResource.get(hbaseVersion);
+               this.hbaseConnector = hbaseConnector;
+               this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + 
hbaseConnector + ".jar");
        }
 
        @Before
@@ -125,7 +144,9 @@ public class SQLClientHBaseITCase extends TestLogger {
                        hbase.putData("source", "row2", "family2", "f2c2", 
"v6");
 
                        // Initialize the SQL statements from "hbase_e2e.sql" 
file
-                       List<String> sqlLines = initializeSqlLines();
+                       Map<String, String> varsMap = new HashMap<>();
+                       varsMap.put("$HBASE_CONNECTOR", hbaseConnector);
+                       List<String> sqlLines = initializeSqlLines(varsMap);
 
                        // Execute SQL statements in "hbase_e2e.sql" file
                        executeSqlStatements(clusterController, sqlLines);
@@ -172,12 +193,21 @@ public class SQLClientHBaseITCase extends TestLogger {
                Assert.assertTrue("Did not get expected results before 
timeout.", success);
        }
 
-       private List<String> initializeSqlLines() throws IOException {
+       private List<String> initializeSqlLines(Map<String, String> vars) 
throws IOException {
                URL url = 
SQLClientHBaseITCase.class.getClassLoader().getResource(HBASE_E2E_SQL);
                if (url == null) {
                        throw new FileNotFoundException(HBASE_E2E_SQL);
                }
-               return Files.readAllLines(new File(url.getFile()).toPath());
+               List<String> lines = Files.readAllLines(new 
File(url.getFile()).toPath());
+               List<String> result = new ArrayList<>();
+               for (String line : lines) {
+                       for (Map.Entry<String, String> var : vars.entrySet()) {
+                               line = line.replace(var.getKey(), 
var.getValue());
+                       }
+                       result.add(line);
+               }
+
+               return result;
        }
 
        private void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines) throws IOException {
diff --git a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql 
b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql
index a5a8ec1..5ae3845 100644
--- a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql
+++ b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql
@@ -19,7 +19,7 @@ CREATE TABLE MyHBaseSource (
   family1 ROW<f1c1 STRING>,
   family2 ROW<f2c1 STRING, f2c2 STRING>
 ) WITH (
-  'connector' = 'hbase-1.4',
+  'connector' = '$HBASE_CONNECTOR',
   'table-name' = 'source',
   'zookeeper.quorum' = 'localhost:2181',
   'zookeeper.znode.parent' = '/hbase'
@@ -30,7 +30,7 @@ CREATE TABLE MyHBaseSink (
   family1 ROW<f1c1 STRING>,
   family2 ROW<f2c1 STRING, f2c2 STRING>
 ) WITH (
-  'connector' = 'hbase-1.4',
+  'connector' = '$HBASE_CONNECTOR',
   'table-name' = 'sink',
   'zookeeper.quorum' = 'localhost:2181',
   'zookeeper.znode.parent' = '/hbase',

Reply via email to