[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-02-07 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r571761491



##
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##
@@ -94,6 +94,33 @@
 + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
 + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+public static final ConfigOption LOOKUP_ASYNC =
+ConfigOptions.key("lookup.async")
+.booleanType()
+.defaultValue(false)
+.withDescription("whether to set async lookup.");
+
+public static final ConfigOption LOOKUP_CACHE_MAX_ROWS =
+ConfigOptions.key("lookup.cache.max-rows")
+.longType()
+.defaultValue(-1L)
+.withDescription(
+"the max number of rows of lookup cache, over this value, the 
oldest rows will "
++ "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is "
++ "specified. Cache is not enabled as default.");
+
+public static final ConfigOption LOOKUP_CACHE_TTL =
+ConfigOptions.key("lookup.cache.ttl")
+.durationType()
+.defaultValue(Duration.ofSeconds(0))
+.withDescription("the cache time to live.");
+
+public static final ConfigOption LOOKUP_MAX_RETRIES =
+ConfigOptions.key("lookup.max-retries")
+.intType()
+.defaultValue(3)
+.withDescription("the max retry times if lookup database failed.");

Review comment:
   Good idea, Can I support cache for HBaseRowDataLookupFunction in the 
current patch?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-02-18 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r57288



##
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##
@@ -94,6 +94,33 @@
 + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
 + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+public static final ConfigOption LOOKUP_ASYNC =
+ConfigOptions.key("lookup.async")
+.booleanType()
+.defaultValue(false)
+.withDescription("whether to set async lookup.");
+
+public static final ConfigOption LOOKUP_CACHE_MAX_ROWS =
+ConfigOptions.key("lookup.cache.max-rows")
+.longType()
+.defaultValue(-1L)
+.withDescription(
+"the max number of rows of lookup cache, over this value, the 
oldest rows will "
++ "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is "
++ "specified. Cache is not enabled as default.");
+
+public static final ConfigOption LOOKUP_CACHE_TTL =
+ConfigOptions.key("lookup.cache.ttl")
+.durationType()
+.defaultValue(Duration.ofSeconds(0))
+.withDescription("the cache time to live.");
+
+public static final ConfigOption LOOKUP_MAX_RETRIES =
+ConfigOptions.key("lookup.max-retries")
+.intType()
+.defaultValue(3)
+.withDescription("the max retry times if lookup database failed.");

Review comment:
   @leonardBang hello ,Can I support cache for HBaseRowDataLookupFunction 
in the current patch?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-02-19 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r579195220



##
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##
@@ -94,6 +94,33 @@
 + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
 + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+public static final ConfigOption LOOKUP_ASYNC =
+ConfigOptions.key("lookup.async")
+.booleanType()
+.defaultValue(false)
+.withDescription("whether to set async lookup.");
+
+public static final ConfigOption LOOKUP_CACHE_MAX_ROWS =
+ConfigOptions.key("lookup.cache.max-rows")
+.longType()
+.defaultValue(-1L)
+.withDescription(
+"the max number of rows of lookup cache, over this value, the 
oldest rows will "
++ "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is "
++ "specified. Cache is not enabled as default.");
+
+public static final ConfigOption LOOKUP_CACHE_TTL =
+ConfigOptions.key("lookup.cache.ttl")
+.durationType()
+.defaultValue(Duration.ofSeconds(0))
+.withDescription("the cache time to live.");
+
+public static final ConfigOption LOOKUP_MAX_RETRIES =
+ConfigOptions.key("lookup.max-retries")
+.intType()
+.defaultValue(3)
+.withDescription("the max retry times if lookup database failed.");

Review comment:
   thanks, I will do that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-04 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r587463451



##
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##
@@ -225,6 +217,13 @@ public Get createGet(Object rowKey) {
 
 /** Converts HBase {@link Result} into {@link RowData}. */
 public RowData convertToRow(Result result) {
+// The output rows needs to be initialized each time

Review comment:
   @leonardBang  Sounds good. I think if we add a reuse flag for this 
function, will complicate the method.
   Maybe we could add a method specifically for the cache. Or you have a better 
idea?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-04 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r587466648



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
   Yes, I will do it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-08 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r589323153



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
   > Could we add an ITCase for `AsyncLookupFunction` like 
`HBaseConnectorITCase.testHBaseLookupTableSource` ?
   
   @leonardBang  I tried to run this method, but an exception was raised. Is 
this a bug?
   Detailed exception information:
   `java.lang.NoClassDefFoundError: com.google.common.base.MoreObjects
at 
org.apache.calcite.config.CalciteSystemProperty.loadProperties(CalciteSystemProperty.java:404)
at 
org.apache.calcite.config.CalciteSystemProperty.(CalciteSystemProperty.java:47)
at org.apache.calcite.util.Util.(Util.java:152)
at 
org.apache.calcite.sql.type.SqlTypeName.(SqlTypeName.java:142)
at 
org.apache.calcite.sql.type.SqlTypeFamily.getTypeNames(SqlTypeFamily.java:163)
at 
org.apache.calcite.sql.type.ReturnTypes.(ReturnTypes.java:127)
at org.apache.calcite.sql.SqlSetOperator.(SqlSetOperator.java:45)
at 
org.apache.calcite.sql.fun.SqlStdOperatorTable.(SqlStdOperatorTable.java:97)
at 
org.apache.calcite.sql2rel.StandardConvertletTable.(StandardConvertletTable.java:101)
at 
org.apache.calcite.sql2rel.StandardConvertletTable.(StandardConvertletTable.java:91)
at 
org.apache.calcite.tools.Frameworks$ConfigBuilder.(Frameworks.java:234)
at 
org.apache.calcite.tools.Frameworks$ConfigBuilder.(Frameworks.java:215)
at 
org.apache.calcite.tools.Frameworks.newConfigBuilder(Frameworks.java:199)
at 
org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:135)
at 
org.apache.flink.table.planner.delegation.PlannerContext.(PlannerContext.java:115)
at 
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:113)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:49)
at 
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:48)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:143)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:113)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:85)`
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-08 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r589323153



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
   > Could we add an ITCase for `AsyncLookupFunction` like 
`HBaseConnectorITCase.testHBaseLookupTableSource` ?
   
   @leonardBang  I tried to run 
HBaseConnectorITCase.testHBaseLookupTableSource, but an exception was raised. 
Is this a bug?
   Detailed exception information:
   `java.lang.NoClassDefFoundError: com.google.common.base.MoreObjects
at 
org.apache.calcite.config.CalciteSystemProperty.loadProperties(CalciteSystemProperty.java:404)
at 
org.apache.calcite.config.CalciteSystemProperty.(CalciteSystemProperty.java:47)
at org.apache.calcite.util.Util.(Util.java:152)
at 
org.apache.calcite.sql.type.SqlTypeName.(SqlTypeName.java:142)
at 
org.apache.calcite.sql.type.SqlTypeFamily.getTypeNames(SqlTypeFamily.java:163)
at 
org.apache.calcite.sql.type.ReturnTypes.(ReturnTypes.java:127)
at org.apache.calcite.sql.SqlSetOperator.(SqlSetOperator.java:45)
at 
org.apache.calcite.sql.fun.SqlStdOperatorTable.(SqlStdOperatorTable.java:97)
at 
org.apache.calcite.sql2rel.StandardConvertletTable.(StandardConvertletTable.java:101)
at 
org.apache.calcite.sql2rel.StandardConvertletTable.(StandardConvertletTable.java:91)
at 
org.apache.calcite.tools.Frameworks$ConfigBuilder.(Frameworks.java:234)
at 
org.apache.calcite.tools.Frameworks$ConfigBuilder.(Frameworks.java:215)
at 
org.apache.calcite.tools.Frameworks.newConfigBuilder(Frameworks.java:199)
at 
org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:135)
at 
org.apache.flink.table.planner.delegation.PlannerContext.(PlannerContext.java:115)
at 
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:113)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:49)
at 
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:48)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:143)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:113)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:85)`
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-09 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r590317782



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {

Review comment:
   Thank you very much!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-11 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592276055



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+private static final long serialVersionUID = 1L;
+
+private final String hTableName;
+private final byte[] serializedConfig;
+private final HBaseTableSchema hbaseTableSchema;
+private final String nullStringLiteral;
+
+private transient AsyncConnection asyncConnection;
+private transient AsyncTable table;
+private transient HBaseSerde serde;
+
+private final long cacheMaxSize;
+private final long cacheExpireMs;
+private final int maxRetryTimes;
+private transient Cache cache;
+
+public HBaseRowDataAsyncLookupFunction(
+Configuration configuration,
+String hTableName,
+HBaseTableSchema hbaseTableSchema,
+String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+this.hTableName = hTableName;
+this.hbaseTableSchema = hbaseTableSchema;
+this.nullStringLiteral = nullStringLiteral;
+this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+}
+
+@Override
+public void open(FunctionContext context) {
+LOG.info("start open ...");
+Configuration config = prepareRuntimeConfiguration();
+CompletableFuture asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+try {
+asyncConnection = asyncConnectionFuture.get();
+table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());

[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-11 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592279270



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+private static final long serialVersionUID = 1L;
+
+private final String hTableName;
+private final byte[] serializedConfig;
+private final HBaseTableSchema hbaseTableSchema;
+private final String nullStringLiteral;
+
+private transient AsyncConnection asyncConnection;
+private transient AsyncTable table;
+private transient HBaseSerde serde;
+
+private final long cacheMaxSize;
+private final long cacheExpireMs;
+private final int maxRetryTimes;
+private transient Cache cache;
+
+public HBaseRowDataAsyncLookupFunction(
+Configuration configuration,
+String hTableName,
+HBaseTableSchema hbaseTableSchema,
+String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+this.hTableName = hTableName;
+this.hbaseTableSchema = hbaseTableSchema;
+this.nullStringLiteral = nullStringLiteral;
+this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+}
+
+@Override
+public void open(FunctionContext context) {
+LOG.info("start open ...");
+Configuration config = prepareRuntimeConfiguration();
+CompletableFuture asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+try {
+asyncConnection = asyncConnectionFuture.get();
+table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());

[GitHub] [flink] anlen321 commented on a change in pull request #14684: [FLINK-20460][Connector-HBase] Support async lookup for HBase connector

2021-03-11 Thread GitBox


anlen321 commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r592324220



##
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+private static final long serialVersionUID = 1L;
+
+private final String hTableName;
+private final byte[] serializedConfig;
+private final HBaseTableSchema hbaseTableSchema;
+private final String nullStringLiteral;
+
+private transient AsyncConnection asyncConnection;
+private transient AsyncTable table;
+private transient HBaseSerde serde;
+
+private final long cacheMaxSize;
+private final long cacheExpireMs;
+private final int maxRetryTimes;
+private transient Cache cache;
+
+public HBaseRowDataAsyncLookupFunction(
+Configuration configuration,
+String hTableName,
+HBaseTableSchema hbaseTableSchema,
+String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+this.hTableName = hTableName;
+this.hbaseTableSchema = hbaseTableSchema;
+this.nullStringLiteral = nullStringLiteral;
+this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+}
+
+@Override
+public void open(FunctionContext context) {
+LOG.info("start open ...");
+Configuration config = prepareRuntimeConfiguration();
+CompletableFuture asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+try {
+asyncConnection = asyncConnectionFuture.get();
+table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());