This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hbase.git
The following commit(s) were added to refs/heads/main by this push: new 5e81d4b [FLINK-33250][Connector/HBase] Remove dependency on `flink-shaded`. This closes #27 5e81d4b is described below commit 5e81d4b788bb0b5c8a9f0115723a30a0754ab97a Author: MartijnVisser <martijnvis...@apache.org> AuthorDate: Thu Oct 12 15:37:11 2023 +0200 [FLINK-33250][Connector/HBase] Remove dependency on `flink-shaded`. This closes #27 * [FLINK-33250][Connector/HBase] Remove direct dependency on `flink-shaded` and refactor tests to use regular Java --- .../connector/hbase2/HBaseConnectorITCase.java | 14 ++++++++---- .../HBaseRowDataAsyncLookupFunctionTest.java | 8 +++---- .../connector/hbase/util/HBaseTableSchema.java | 25 +++++++++------------- pom.xml | 1 - 4 files changed, 23 insertions(+), 25 deletions(-) diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index fbd7b2c..c00b83a 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -40,8 +40,6 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CollectionUtil; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; @@ -51,7 +49,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.flink.table.api.Expressions.$; import static org.junit.Assert.assertEquals; @@ -385,7 +386,10 @@ public class HBaseConnectorITCase extends HBaseTestBase { TableResult tableResult3 = batchEnv.executeSql(query); List<String> result = - Lists.newArrayList(tableResult3.collect()).stream() + StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + tableResult3.collect(), Spliterator.ORDERED), + false) .map(Row::toString) .sorted() .collect(Collectors.toList()); @@ -470,7 +474,9 @@ public class HBaseConnectorITCase extends HBaseTestBase { + " FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rowkey"; Iterator<Row> collected = tEnv.executeSql(dimJoinQuery).collect(); List<String> result = - Lists.newArrayList(collected).stream() + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(collected, Spliterator.ORDERED), + false) .map(Row::toString) .sorted() .collect(Collectors.toList()); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java index 312041f..98ec481 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java @@ -25,16 +25,14 @@ import org.apache.flink.table.connector.source.lookup.LookupOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.junit.Test; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.DOUBLE; @@ -74,8 +72,8 @@ public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase { assertTrue(result.size() < rowkeys.length); latch.await(); lookupFunction.close(); - List<String> sortResult = - Lists.newArrayList(result).stream().sorted().collect(Collectors.toList()); + List<String> sortResult = new ArrayList<>(result); + Collections.sort(sortResult); List<String> expected = new ArrayList<>(); expected.add("12: null"); expected.add("12: null"); diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java index 546e5ce..d1a04bc 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java @@ -29,11 +29,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.guava30.com.google.common.collect.Streams; - import java.io.Serializable; import java.nio.charset.Charset; -import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -279,12 +276,11 @@ public class HBaseTableSchema implements Serializable { getQualifierNames(family), getQualifierDataTypes(family)); } } - return DataTypes.ROW( - Streams.zip( - Arrays.stream(fieldNames), - Arrays.stream(fieldTypes), - DataTypes::FIELD) - .toArray(DataTypes.Field[]::new)); + DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length]; + for (int i = 0; i < fields.length; i++) { + fields[i] = DataTypes.FIELD(fieldNames[i], fieldTypes[i]); + } + return DataTypes.ROW(fields); } else { String[] fieldNames = new String[familyNames.length]; DataType[] fieldTypes = new DataType[familyNames.length]; @@ -294,12 +290,11 @@ public class HBaseTableSchema implements Serializable { fieldTypes[i] = getRowDataType(getQualifierNames(family), getQualifierDataTypes(family)); } - return DataTypes.ROW( - Streams.zip( - Arrays.stream(fieldNames), - Arrays.stream(fieldTypes), - DataTypes::FIELD) - .toArray(DataTypes.Field[]::new)); + DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length]; + for (int i = 0; i < fields.length; i++) { + fields[i] = DataTypes.FIELD(fieldNames[i], fieldTypes[i]); + } + return DataTypes.ROW(fields); } } diff --git a/pom.xml b/pom.xml index bf6103c..4f28754 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,6 @@ under the License. <properties> <flink.version>1.16.2</flink.version> - <flink.shaded.version>15.0</flink.shaded.version> <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.7</scala.version>