[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307465052
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecoratorTest.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * CachedLookupTableSourceTest.
+ */
+public class CachedLookupFunctionDecoratorTest {
 
 Review comment:
   How about add a test to verify the cache expired?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307457693
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/LookupFunctionInvoker.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.TableFunction;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * LookupFunctionInvoker is mainly to handle the parameter type.
+ */
+class LookupFunctionInvoker implements InvocationHandler {
 
 Review comment:
   Why not set this class as `public` or private within 
`CachedLookupFunctionDecorator`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307459072
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/LookupFunctionInvoker.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.TableFunction;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * LookupFunctionInvoker is mainly to handle the parameter type.
+ */
+class LookupFunctionInvoker implements InvocationHandler {
+
+   private final TableFunction tableFunction;
+   private volatile Method realMethod = null;
 
 Review comment:
   Currently, the `realMethod` could only have one choice: `eval`, how about 
rename this to `evalMethod`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307456719
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecorator.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CachedLookupableTableSource.
+ * LIMITATION: now the function eval of the lookupTableSource implementation 
only supports parameter as Object or Object...
+ * TODO: in the future, to extract the parameter type from the Method, but I 
think it's not much urgent.
 
 Review comment:
   IMO, it would be enough to leave TODO just in the implementation of code not 
in the javadoc of this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307450367
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecorator.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CachedLookupableTableSource.
+ * LIMITATION: now the function eval of the lookupTableSource implementation 
only supports parameter as Object or Object...
+ * TODO: in the future, to extract the parameter type from the Method, but I 
think it's not much urgent.
+ */
+public class CachedLookupFunctionDecorator extends TableFunction {
+   //default 1day.
+   private static final long EXPIRED_TIME_MS_DEFAULT = 24 * 60 * 60 * 1000;
+   private final TableFunction lookupTableSource;
+   private transient Cache> cache;
+   private transient LookupFunctionInvoker.Evaluation realEval;
+   private CollectorProxy collectorProxy;
+   private final long expireTimeMS;
+   private final long maximumSize;
+   private final boolean recordStat;
+   private final boolean isVariable;
 
 Review comment:
   How about change this field name to `isVarArgs`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307463620
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecoratorTest.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * CachedLookupTableSourceTest.
+ */
+public class CachedLookupFunctionDecoratorTest {
+   private final long maximnCacheSize = 1 * 1024 * 1024L;
+   List result = new ArrayList<>();
+   Collector testCollector = new Collector() {
+   @Override
+   public void collect(Row record) {
+   result.add(record);
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   @Test
+   public void testEvalVariableObjectKey() throws Exception {
+   result.clear();
+   CachedLookupFunctionDecorator cachedLookupTableSource = 
new CachedLookupFunctionDecorator<>(new TestLookupFucntion(),
+   maximnCacheSize);
+   cachedLookupTableSource.setCollector(testCollector);
+   cachedLookupTableSource.open(mock(FunctionContext.class));
+
+   Cache> cache = 
cachedLookupTableSource.getCache();
+   //cache still have no data.
+   Assert.assertEquals(0, cache.size());
+   cachedLookupTableSource.eval("1");
+   //load into cache and emit correctly.
+   Assert.assertEquals(1, cache.size());
+   Assert.assertEquals(5, result.size());
+   Assert.assertEquals(cache.getIfPresent(Row.of("1")), result);
+   List expected = Lists.newArrayList(Row.of("1", "0"),
+   Row.of("1", "1"),
+   Row.of("1", "2"),
+   Row.of("1", "3"),
+   Row.of("1", "4"));
+   Assert.assertEquals(expected, result);
+
+   // cache hit.
+   cachedLookupTableSource.eval("1");
+   Assert.assertEquals(1, cache.size());
+   Assert.assertEquals(10, result.size());
+   List expected2 = cache.getIfPresent(Row.of("1"));
+   expected2.addAll(cache.getIfPresent(Row.of("1")));
+   Assert.assertEquals(expected2, result);
+
+   cachedLookupTableSource.eval("2");
+   Assert.assertEquals(2, cache.size());
+   Assert.assertEquals(15, result.size());
+   expected2.addAll(cache.getIfPresent(Row.of("2")));
+   Assert.assertEquals(expected2, result);
+
+   cachedLookupTableSource.eval("3", "4");
+   Assert.assertEquals(3, cache.size());
+   Assert.assertEquals(20, result.size());
+   expected2.addAll(cache.getIfPresent(Row.of("3", "4")));
+   Assert.assertEquals(expected2, result);
+
+   expected = Lists.newArrayList(Row.of("3", "4", "0"),
+   Row.of("3", "4", "1"),
+   Row.of("3", "4", "2"),
+   Row.of("3", "4", "3"),
+   Row.of("3", "4", "4"));
+   List multKey = cache.getIfPresent(Row.of("3", "4"));
+   Assert.assertEquals(expected, multKey);
+   cachedLookupTableSource.close();
+
+   }
+
+   @Test
+   public void testWithMaxSize() throws Exception {
+   result.clear();
+   //set maxSize is 1, that means only can hold one key in 

[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307464222
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/LookupFunctionInvokerTest.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * LookupFunctionInvokerTest.
+ */
+public class LookupFunctionInvokerTest {
+   @Test
+   public void testNormal() {
+   AtomicInteger vaviableObjectCount = new AtomicInteger(0);
+   TableFunction functionMulit = new TableFunction() {
 
 Review comment:
   typo, `functionMulti`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307463872
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/LookupFunctionInvokerTest.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * LookupFunctionInvokerTest.
+ */
+public class LookupFunctionInvokerTest {
+   @Test
+   public void testNormal() {
+   AtomicInteger vaviableObjectCount = new AtomicInteger(0);
 
 Review comment:
   typo, `variableObjectCount`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307464504
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecoratorTest.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * CachedLookupTableSourceTest.
+ */
+public class CachedLookupFunctionDecoratorTest {
+   private final long maximnCacheSize = 1 * 1024 * 1024L;
+   List result = new ArrayList<>();
+   Collector testCollector = new Collector() {
+   @Override
+   public void collect(Row record) {
+   result.add(record);
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   @Test
+   public void testEvalVariableObjectKey() throws Exception {
+   result.clear();
+   CachedLookupFunctionDecorator cachedLookupTableSource = 
new CachedLookupFunctionDecorator<>(new TestLookupFucntion(),
+   maximnCacheSize);
+   cachedLookupTableSource.setCollector(testCollector);
+   cachedLookupTableSource.open(mock(FunctionContext.class));
+
+   Cache> cache = 
cachedLookupTableSource.getCache();
+   //cache still have no data.
+   Assert.assertEquals(0, cache.size());
+   cachedLookupTableSource.eval("1");
+   //load into cache and emit correctly.
+   Assert.assertEquals(1, cache.size());
+   Assert.assertEquals(5, result.size());
+   Assert.assertEquals(cache.getIfPresent(Row.of("1")), result);
+   List expected = Lists.newArrayList(Row.of("1", "0"),
+   Row.of("1", "1"),
+   Row.of("1", "2"),
+   Row.of("1", "3"),
+   Row.of("1", "4"));
+   Assert.assertEquals(expected, result);
+
+   // cache hit.
+   cachedLookupTableSource.eval("1");
+   Assert.assertEquals(1, cache.size());
+   Assert.assertEquals(10, result.size());
+   List expected2 = cache.getIfPresent(Row.of("1"));
+   expected2.addAll(cache.getIfPresent(Row.of("1")));
+   Assert.assertEquals(expected2, result);
+
+   cachedLookupTableSource.eval("2");
+   Assert.assertEquals(2, cache.size());
+   Assert.assertEquals(15, result.size());
+   expected2.addAll(cache.getIfPresent(Row.of("2")));
+   Assert.assertEquals(expected2, result);
+
+   cachedLookupTableSource.eval("3", "4");
+   Assert.assertEquals(3, cache.size());
+   Assert.assertEquals(20, result.size());
+   expected2.addAll(cache.getIfPresent(Row.of("3", "4")));
+   Assert.assertEquals(expected2, result);
+
+   expected = Lists.newArrayList(Row.of("3", "4", "0"),
+   Row.of("3", "4", "1"),
+   Row.of("3", "4", "2"),
+   Row.of("3", "4", "3"),
+   Row.of("3", "4", "4"));
+   List multKey = cache.getIfPresent(Row.of("3", "4"));
+   Assert.assertEquals(expected, multKey);
+   cachedLookupTableSource.close();
+
+   }
+
+   @Test
+   public void testWithMaxSize() throws Exception {
+   result.clear();
+   //set maxSize is 1, that means only can hold one key in 

[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307460899
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecorator.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CachedLookupableTableSource.
+ * LIMITATION: now the function eval of the lookupTableSource implementation 
only supports parameter as Object or Object...
+ * TODO: in the future, to extract the parameter type from the Method, but I 
think it's not much urgent.
+ */
+public class CachedLookupFunctionDecorator extends TableFunction {
+   //default 1day.
+   private static final long EXPIRED_TIME_MS_DEFAULT = 24 * 60 * 60 * 1000;
+   private final TableFunction lookupTableSource;
+   private transient Cache> cache;
+   private transient LookupFunctionInvoker.Evaluation realEval;
+   private CollectorProxy collectorProxy;
+   private final long expireTimeMS;
+   private final long maximumSize;
+   private final boolean recordStat;
 
 Review comment:
   How about rename to `recordStats`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307451707
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecorator.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CachedLookupableTableSource.
+ * LIMITATION: now the function eval of the lookupTableSource implementation 
only supports parameter as Object or Object...
+ * TODO: in the future, to extract the parameter type from the Method, but I 
think it's not much urgent.
+ */
+public class CachedLookupFunctionDecorator extends TableFunction {
+   //default 1day.
+   private static final long EXPIRED_TIME_MS_DEFAULT = 24 * 60 * 60 * 1000;
+   private final TableFunction lookupTableSource;
+   private transient Cache> cache;
+   private transient LookupFunctionInvoker.Evaluation realEval;
+   private CollectorProxy collectorProxy;
+   private final long expireTimeMS;
+   private final long maximumSize;
+   private final boolean recordStat;
+   private final boolean isVariable;
+
+   public CachedLookupFunctionDecorator(TableFunction 
lookupTableSource, long maximumSize) {
+   this(lookupTableSource, maximumSize, EXPIRED_TIME_MS_DEFAULT);
+   }
+
+   public CachedLookupFunctionDecorator(TableFunction 
lookupTableSource, long maximumSize, long expireTimeMs) {
+   this(lookupTableSource, maximumSize, expireTimeMs, true);
+   }
+
+   public CachedLookupFunctionDecorator(
+   TableFunction lookupTableSource, long maximumSize, long 
expireTimeMs, boolean recordStat) {
+   this.lookupTableSource = lookupTableSource;
+   this.maximumSize = maximumSize;
+   this.expireTimeMS = expireTimeMs;
+   this.recordStat = recordStat;
+   this.isVariable = checkMethodVariable("eval", 
lookupTableSource.getClass());
+   }
+
+   @Override
+   public void open(FunctionContext context) throws Exception {
+   lookupTableSource.open(context);
+   collectorProxy = new CollectorProxy();
+   lookupTableSource.setCollector(collectorProxy);
+   LookupFunctionInvoker lookupFunctionInvoker = new 
LookupFunctionInvoker(lookupTableSource);
+   realEval = lookupFunctionInvoker.getProxy();
+   CacheBuilder cacheBuilder = 
CacheBuilder.newBuilder().expireAfterWrite(expireTimeMS,
+   TimeUnit.MILLISECONDS).maximumSize(maximumSize);
+   if (this.recordStat) {
+   cacheBuilder.recordStats();
+   }
+   this.cache = cacheBuilder.build();
+   }
+
+   @Override
+   public void close() throws Exception {
+   if (cache != null) {
+   cache.cleanUp();
+   cache = null;
+   }
+   lookupTableSource.close();
+   }
+
+   @Override
+   public TypeInformation getResultType() {
+   return lookupTableSource.getResultType();
+   }
+
+   @Override
+   public TypeInformation[] getParameterTypes(Class[] signature) {
+   return lookupTableSource.getParameterTypes(signature);
+   }
+
+   @VisibleForTesting
+   Cache> getCache() {
+   return cache;
+   }
+
+   public void eval(Object... keys) {
+

[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307464111
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/LookupFunctionInvokerTest.java
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * LookupFunctionInvokerTest.
+ */
+public class LookupFunctionInvokerTest {
+   @Test
+   public void testNormal() {
+   AtomicInteger vaviableObjectCount = new AtomicInteger(0);
+   TableFunction functionMulit = new TableFunction() {
+   public void eval(Object... keys) {
+   vaviableObjectCount.incrementAndGet();
+   System.out.println(Arrays.asList(keys));
 
 Review comment:
   I think this might be debug message? If so, I think this could be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307456029
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecorator.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CachedLookupableTableSource.
 
 Review comment:
   How about `A general lookup function decorator enhanced with the ability to 
cache recent query results.` or other more detailed descriptions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #9231: [FLINK-13252] Common CachedLookupFunction for All connector

2019-07-25 Thread GitBox
Myasuka commented on a change in pull request #9231: [FLINK-13252] Common 
CachedLookupFunction for All connector
URL: https://github.com/apache/flink/pull/9231#discussion_r307464890
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/sources/decorator/CachedLookupFunctionDecoratorTest.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.decorator;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * CachedLookupTableSourceTest.
+ */
+public class CachedLookupFunctionDecoratorTest {
+   private final long maximnCacheSize = 1 * 1024 * 1024L;
 
 Review comment:
   typo, `maximumCacheSize`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services