This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
commit 6ac4a3faea7596b91432c1213a5487916c9a7e63 Author: Miklos Gergely <mgerg...@cloudera.com> AuthorDate: Tue Aug 25 13:19:04 2020 +0200 [FLINK-18795][hbase] Support for HBase 2 This closes #13128 --- flink-connector-hbase-e2e-tests/pom.xml | 20 +++++++++-- .../flink/tests/util/hbase/HBaseResource.java | 5 +-- .../tests/util/hbase/HBaseResourceFactory.java | 3 +- .../util/hbase/LocalStandaloneHBaseResource.java | 8 +++-- .../hbase/LocalStandaloneHBaseResourceFactory.java | 4 +-- .../tests/util/hbase/SQLClientHBaseITCase.java | 42 ++++++++++++++++++---- .../src/test/resources/hbase_e2e.sql | 4 +-- 7 files changed, 67 insertions(+), 19 deletions(-) diff --git a/flink-connector-hbase-e2e-tests/pom.xml b/flink-connector-hbase-e2e-tests/pom.xml index c280259..4875e6f 100644 --- a/flink-connector-hbase-e2e-tests/pom.xml +++ b/flink-connector-hbase-e2e-tests/pom.xml @@ -49,7 +49,13 @@ under the License. <!--using hbase shade jar to execute end-to-end test--> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-connector-hbase_${scala.binary.version}</artifactId> + <artifactId>flink-sql-connector-hbase-1.4_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-hbase-2.2_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> @@ -186,9 +192,17 @@ under the License. </artifactItem> <artifactItem> <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-connector-hbase_${scala.binary.version}</artifactId> + <artifactId>flink-sql-connector-hbase-1.4_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <destFileName>sql-hbase-1.4.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-hbase-2.2_${scala.binary.version}</artifactId> <version>${project.version}</version> - <destFileName>sql-hbase.jar</destFileName> + <destFileName>sql-hbase-2.2.jar</destFileName> <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java index 4b9e091..83bf249 100644 --- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java +++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResource.java @@ -61,12 +61,13 @@ public interface HBaseResource extends ExternalResource { /** * Returns the configured HBaseResource implementation, or a {@link LocalStandaloneHBaseResource} if none is configured. * + * @param version The hbase version * @return configured HbaseResource, or {@link LocalStandaloneHBaseResource} if none is configured */ - static HBaseResource get() { + static HBaseResource get(String version) { return FactoryUtils.loadAndInvokeFactory( HBaseResourceFactory.class, - HBaseResourceFactory::create, + factory -> factory.create(version), LocalStandaloneHBaseResourceFactory::new); } } diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java index bd873fa..27b3864 100644 --- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java +++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/HBaseResourceFactory.java @@ -30,8 +30,9 @@ public interface HBaseResourceFactory { * Returns a {@link HBaseResource} instance. If the instance could not be instantiated (for example, because a * mandatory parameter was missing), then an empty {@link Optional} should be returned. * + * @param version The hbase version * @return HBaseResource instance * @throws Exception if the instance could not be instantiated */ - HBaseResource create() throws Exception; + HBaseResource create(String version) throws Exception; } diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java index c1b6b7b..2478432 100644 --- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java +++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java @@ -49,16 +49,18 @@ public class LocalStandaloneHBaseResource implements HBaseResource { private final TemporaryFolder tmp = new TemporaryFolder(); private final DownloadCache downloadCache = DownloadCache.get(); + private final String hbaseVersion; private Path hbaseDir; - LocalStandaloneHBaseResource() { + LocalStandaloneHBaseResource(String hbaseVersion) { OperatingSystemRestriction.forbid( String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()), OperatingSystem.WINDOWS); + this.hbaseVersion = hbaseVersion; } - private static String getHBaseDownloadUrl() { - return "https://archive.apache.org/dist/hbase/1.4.3/hbase-1.4.3-bin.tar.gz"; + private String getHBaseDownloadUrl() { + return String.format("https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion); } @Override diff --git a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java index 9eb1ab9..2e57f0f 100644 --- a/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java +++ b/flink-connector-hbase-e2e-tests/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResourceFactory.java @@ -24,7 +24,7 @@ package org.apache.flink.tests.util.hbase; public class LocalStandaloneHBaseResourceFactory implements HBaseResourceFactory { @Override - public HBaseResource create() { - return new LocalStandaloneHBaseResource(); + public HBaseResource create(String version) { + return new LocalStandaloneHBaseResource(version); } } diff --git a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java index ad5f4f7..ca603f4 100644 --- a/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java +++ b/flink-connector-hbase-e2e-tests/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java @@ -40,6 +40,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +53,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; @@ -62,6 +68,7 @@ import static org.junit.Assert.assertThat; /** * End-to-end test for the HBase connectors. */ +@RunWith(Parameterized.class) @Category(value = {TravisGroup1.class, PreCommit.class, FailsOnJava11.class}) public class SQLClientHBaseITCase extends TestLogger { @@ -69,6 +76,14 @@ public class SQLClientHBaseITCase extends TestLogger { private static final String HBASE_E2E_SQL = "hbase_e2e.sql"; + @Parameterized.Parameters(name = "{index}: hbase-version:{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.4.3", "hbase-1.4"}, + {"2.2.3", "hbase-2.2"} + }); + } + @Rule public final HBaseResource hbase; @@ -79,16 +94,20 @@ public class SQLClientHBaseITCase extends TestLogger { @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + private final String hbaseConnector; + private final Path sqlConnectorHBaseJar; + @ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get(); private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar"); - private static final Path sqlConnectorHBaseJar = TestUtils.getResource(".*hbase.jar"); private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath"); private List<Path> hadoopClasspathJars; - public SQLClientHBaseITCase() { - this.hbase = HBaseResource.get(); + public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) { + this.hbase = HBaseResource.get(hbaseVersion); + this.hbaseConnector = hbaseConnector; + this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + hbaseConnector + ".jar"); } @Before @@ -125,7 +144,9 @@ public class SQLClientHBaseITCase extends TestLogger { hbase.putData("source", "row2", "family2", "f2c2", "v6"); // Initialize the SQL statements from "hbase_e2e.sql" file - List<String> sqlLines = initializeSqlLines(); + Map<String, String> varsMap = new HashMap<>(); + varsMap.put("$HBASE_CONNECTOR", hbaseConnector); + List<String> sqlLines = initializeSqlLines(varsMap); // Execute SQL statements in "hbase_e2e.sql" file executeSqlStatements(clusterController, sqlLines); @@ -172,12 +193,21 @@ public class SQLClientHBaseITCase extends TestLogger { Assert.assertTrue("Did not get expected results before timeout.", success); } - private List<String> initializeSqlLines() throws IOException { + private List<String> initializeSqlLines(Map<String, String> vars) throws IOException { URL url = SQLClientHBaseITCase.class.getClassLoader().getResource(HBASE_E2E_SQL); if (url == null) { throw new FileNotFoundException(HBASE_E2E_SQL); } - return Files.readAllLines(new File(url.getFile()).toPath()); + List<String> lines = Files.readAllLines(new File(url.getFile()).toPath()); + List<String> result = new ArrayList<>(); + for (String line : lines) { + for (Map.Entry<String, String> var : vars.entrySet()) { + line = line.replace(var.getKey(), var.getValue()); + } + result.add(line); + } + + return result; } private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException { diff --git a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql index a5a8ec1..5ae3845 100644 --- a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql +++ b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql @@ -19,7 +19,7 @@ CREATE TABLE MyHBaseSource ( family1 ROW<f1c1 STRING>, family2 ROW<f2c1 STRING, f2c2 STRING> ) WITH ( - 'connector' = 'hbase-1.4', + 'connector' = '$HBASE_CONNECTOR', 'table-name' = 'source', 'zookeeper.quorum' = 'localhost:2181', 'zookeeper.znode.parent' = '/hbase' @@ -30,7 +30,7 @@ CREATE TABLE MyHBaseSink ( family1 ROW<f1c1 STRING>, family2 ROW<f2c1 STRING, f2c2 STRING> ) WITH ( - 'connector' = 'hbase-1.4', + 'connector' = '$HBASE_CONNECTOR', 'table-name' = 'sink', 'zookeeper.quorum' = 'localhost:2181', 'zookeeper.znode.parent' = '/hbase',