XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1084051452


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,56 +72,56 @@ public void open(Configuration parameters) throws Exception 
{
     }
 
     @Override
-    protected void reloadCache() throws Exception {
+    protected boolean updateCache() throws Exception {
         InputSplit[] inputSplits = createInputSplits();
         int numSplits = inputSplits.length;
+        int concurrencyLevel = getConcurrencyLevel(numSplits);
         // load data into the another copy of cache
-        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking
+        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking;
         // cache has default initialCapacity and loadFactor, but overridden 
concurrencyLevel
         ConcurrentHashMap<RowData, Collection<RowData>> newCache =
-                new ConcurrentHashMap<>(16, 0.75f, 
getConcurrencyLevel(numSplits));
-        this.cacheLoadTasks =
+                new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+        Deque<InputSplitCacheLoadTask> cacheLoadTasks =
                 Arrays.stream(inputSplits)
                         .map(split -> createCacheLoadTask(split, newCache))
-                        .collect(Collectors.toList());
-        if (isStopped) {
-            // check for cases when #close was called during reload before 
creating cacheLoadTasks
-            return;
-        }
-        // run first task or create numSplits threads to run all tasks
+                        .collect(Collectors.toCollection(ArrayDeque::new));
+        // run first task and create concurrencyLevel - 1 threads to run 
remaining tasks
         ExecutorService cacheLoadTaskService = null;
+        boolean wasInterrupted;

Review Comment:
   ```suggestion
           boolean wasInterrupted = false;
   ```
   I know that `false` is the default value but for readability purposes it's 
good to make this explicit, I think.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -107,12 +107,12 @@ public void open(FunctionContext context) throws 
Exception {
             numLoadFailuresCounter = new SimpleCounter();
             cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
         }
-        // Initialize cache and the delegating function
-        cache.open(cacheMetricGroup);
         if (cache instanceof LookupFullCache) {
             // TODO add Configuration into FunctionContext
-            ((LookupFullCache) cache).open(new Configuration());
+            ((LookupFullCache) cache).setParameters(new Configuration());

Review Comment:
   What value does this call bring here? Can't we get rid of the if block 
entirely? We're setting an empty Conflguration here which is already the 
default within `CachingLookupFunction`. I don't see 
`LookupFullCache.setParameter` being called anywhere else? :thinking:  



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.java:
##########
@@ -107,12 +107,12 @@ public void open(FunctionContext context) throws 
Exception {
             numLoadFailuresCounter = new SimpleCounter();
             cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
         }
-        // Initialize cache and the delegating function
-        cache.open(cacheMetricGroup);
         if (cache instanceof LookupFullCache) {
             // TODO add Configuration into FunctionContext

Review Comment:
   I know that this is not part of the current PR. But just as a side-note: 
It's better to create follow-up Jira issues rather than hiding planned changes 
in a TODO comment.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -71,58 +72,58 @@ public void open(Configuration parameters) throws Exception 
{
     }
 
     @Override
-    protected void reloadCache() throws Exception {
+    protected void updateCache() throws Exception {
         InputSplit[] inputSplits = createInputSplits();
         int numSplits = inputSplits.length;
+        int concurrencyLevel = getConcurrencyLevel(numSplits);
         // load data into the another copy of cache
-        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking
+        // notice: it requires twice more memory, but on the other hand we 
don't need any blocking;
         // cache has default initialCapacity and loadFactor, but overridden 
concurrencyLevel
         ConcurrentHashMap<RowData, Collection<RowData>> newCache =
-                new ConcurrentHashMap<>(16, 0.75f, 
getConcurrencyLevel(numSplits));
-        this.cacheLoadTasks =
+                new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
+        Deque<InputSplitCacheLoadTask> cacheLoadTasks =
                 Arrays.stream(inputSplits)
                         .map(split -> createCacheLoadTask(split, newCache))
-                        .collect(Collectors.toList());
-        if (isStopped) {
-            // check for cases when #close was called during reload before 
creating cacheLoadTasks
-            return;
-        }
-        // run first task or create numSplits threads to run all tasks
+                        .collect(Collectors.toCollection(ArrayDeque::new));
+        // run first task and create concurrencyLevel - 1 threads to run 
remaining tasks
         ExecutorService cacheLoadTaskService = null;
         try {
-            if (numSplits > 1) {
-                int numThreads = getConcurrencyLevel(numSplits);
-                cacheLoadTaskService = 
Executors.newFixedThreadPool(numThreads);
-                ExecutorService finalCacheLoadTaskService = 
cacheLoadTaskService;
-                List<Future<?>> futures =
+            InputSplitCacheLoadTask firstTask = cacheLoadTasks.pop();
+            CompletableFuture<?> otherTasksFuture = null;
+            if (!cacheLoadTasks.isEmpty()) {
+                cacheLoadTaskService = 
Executors.newFixedThreadPool(concurrencyLevel - 1);
+                ExecutorService finalExecutor = cacheLoadTaskService;

Review Comment:
   ah true, I overlooked that one. 8)



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -109,7 +116,9 @@ public long size() {
 
     @Override
     public void close() throws Exception {
-        reloadTrigger.close(); // firstly try to interrupt reload thread
+        // in default triggers shutdowns scheduled thread pool used for 
periodic triggers of reloads

Review Comment:
   ```suggestion
           // stops scheduled thread pool that's responsible for scheduling 
cache updates
   ```
   Just as a proposal



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java:
##########
@@ -109,7 +116,9 @@ public long size() {
 
     @Override
     public void close() throws Exception {
-        reloadTrigger.close(); // firstly try to interrupt reload thread
+        // in default triggers shutdowns scheduled thread pool used for 
periodic triggers of reloads
+        reloadTrigger.close();
+        // shutdowns the reload thread and interrupts active reload task, if 
present

Review Comment:
   ```suggestion
           // stops thread pool that's responsible for executing the actual 
cache update
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to