[ 
https://issues.apache.org/jira/browse/HIVE-25582?focusedWorklogId=683200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-683200
 ]

ASF GitHub Bot logged work on HIVE-25582:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Nov/21 10:15
            Start Date: 18/Nov/21 10:15
    Worklog Time Spent: 10m 
      Work Description: kgyrtkirk commented on a change in pull request #2693:
URL: https://github.com/apache/hive/pull/2693#discussion_r752093206



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
##########
@@ -18,82 +18,66 @@
 
 package org.apache.hadoop.hive.ql.exec.mr;
 
+import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 /**
- * ObjectCache. No-op implementation on MR we don't have a means to reuse
- * Objects between runs of the same task.
+ * ObjectCache. Simple implementation on MR we don't have a means to reuse
+ * Objects between runs of the same task, this acts as a local cache.
  *
  */
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache 
{
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ObjectCache.class.getName());
 
+  private final Map<String, Object> cache = new ConcurrentHashMap<>();
+
+  private static ExecutorService staticPool = Executors.newCachedThreadPool();
+
   @Override
   public void release(String key) {
     // nothing to do
     LOG.debug("{} no longer needed", key);
+    cache.remove(key);
   }
 
   @Override
   public <T> T retrieve(String key) throws HiveException {
-    return retrieve(key, null);
+    return (T) cache.get(key);
   }
 
   @Override
   public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+    T value = (T) cache.get(key);
+    if (value != null || fn == null) {
+      return value;
+    }
     try {
       LOG.debug("Creating {}", key);
-      return fn.call();
+      value = fn.call();
     } catch (Exception e) {
       throw new HiveException(e);
     }
+    T previous = (T) cache.putIfAbsent(key, value);
+    return previous != null ? previous : value;
   }
 
   @Override
   public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws 
HiveException {
-    final T value = retrieve(key, fn);
-
-    return new Future<T>() {
-
-      @Override
-      public boolean cancel(boolean mayInterruptIfRunning) {
-        return false;
-      }
-
-      @Override
-      public boolean isCancelled() {
-        return false;
-      }
-
-      @Override
-      public boolean isDone() {
-        return true;
-      }
-
-      @Override
-      public T get() throws InterruptedException, ExecutionException {
-        return value;
-      }
-
-      @Override
-      public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
-          TimeoutException {
-        return value;
-      }
-    };
+    return staticPool.submit((Callable)() -> retrieve(key, fn));

Review comment:
       I think the old `new Future` was somewhat better - as it not needed an 
"executor" to be running in the background to service this method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 683200)
    Time Spent: 2h 10m  (was: 2h)

> Empty result when using offset limit with MR
> --------------------------------------------
>
>                 Key: HIVE-25582
>                 URL: https://issues.apache.org/jira/browse/HIVE-25582
>             Project: Hive
>          Issue Type: Bug
>          Components: Operators
>    Affects Versions: 4.0.0
>            Reporter: Zhihua Deng
>            Assignee: Zhihua Deng
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The _mr.ObjectCache_ caches nothing, every time when the limit [retrieving 
> global counter from the 
> cache|https://github.com/apache/hive/blob/7b3ecf617a6d46f48a3b6f77e0339fd4ad95a420/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java#L150-L161],
>  a new AtomicInteger will be returned. This make offset _<= 
> currentCountForAllTasksInt_ always be evaluated to false, as _offset > 0_, 
> the operator will skip all rows.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to