zratkai commented on code in PR #5771:
URL: https://github.com/apache/hive/pull/5771#discussion_r2049184852


##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/client/HiveMetaStoreClientWithLocalCache.java:
##########
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata.client;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.HiveMetaStoreClientUtils;
+import 
org.apache.hadoop.hive.metastore.client.NoopHiveMetaStoreClientDelegator;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.CacheKey;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.CacheWrapper;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.KeyType;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.PartitionNamesWrapper;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.PartitionSpecsWrapper;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.PartitionsWrapper;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.Supplier;
+import 
org.apache.hadoop.hive.ql.metadata.client.MetaStoreClientCacheUtils.TableWatermark;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import 
org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.thrift.TException;
+import org.checkerframework.checker.index.qual.NonNegative;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.convertToGetPartitionsByNamesRequest;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.parseDbName;
+
+/**
+ * This class introduces a caching layer in HS2 for metadata for some selected 
query APIs. It extends
+ * HiveMetaStoreClient, and overrides some of its methods to add this feature.
+ * Its design is simple, relying on snapshot information being queried to 
cache and invalidate the metadata.
+ * It helps to reduce the time spent in compilation by using HS2 memory more 
effectively, and it allows to
+ * improve HMS throughput for multi-tenant workloads by reducing the number of 
calls it needs to serve.
+ */
+public class HiveMetaStoreClientWithLocalCache extends 
NoopHiveMetaStoreClientDelegator
+    implements IMetaStoreClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetaStoreClientWithLocalCache.class);
+
+  private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  private static long maxSize;
+  private static boolean recordStats;
+  private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+  private static String cacheObjName = null;
+
+  private final Configuration conf;
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, 
IMetaStoreClient delegate)
+      throws MetaException {
+    super(delegate);
+    this.conf = conf;
+  }
+
+  public static synchronized void init(Configuration conf) {
+    // init cache only once
+    if (!INITIALIZED.get()) {
+      LOG.info("Initializing local cache in HiveMetaStoreClient...");
+      maxSize = HiveConf.getSizeVar(conf, 
HiveConf.ConfVars.MSC_CACHE_MAX_SIZE);
+      recordStats = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.MSC_CACHE_RECORD_STATS);
+      initSizeEstimator();
+      initCache();
+      LOG.info("Local cache initialized in HiveMetaStoreClient: {}", 
mscLocalCache);
+      INITIALIZED.set(true);
+    }
+  }
+
+  private static void initSizeEstimator() {
+    sizeEstimator = new HashMap<>();
+    IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, 
sizeEstimator);
+    for (KeyType e : KeyType.values()) {
+      for (Class<?> c : e.keyClasses) {
+        IncrementalObjectSizeEstimator.createEstimators(c, sizeEstimator);
+      }
+      IncrementalObjectSizeEstimator.createEstimators(e.valueClass, 
sizeEstimator);
+    }
+  }
+
+  /**
+   * Initializes the cache
+   */
+  private static void initCache() {
+    int initSize = 100;
+    Weigher<CacheKey, Object> weigher = new Weigher<CacheKey, Object>() {
+      @Override
+      public @NonNegative int weigh(@NonNull CacheKey key, @NonNull Object 
val) {
+        return MetaStoreClientCacheUtils.getWeight(key, val, sizeEstimator);
+      }
+    };
+    Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+        .initialCapacity(initSize)
+        .maximumWeight(maxSize)
+        .weigher(weigher)
+        .removalListener((key, val, cause) -> {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
+          }});
+    if (recordStats) {
+      cacheBuilder.recordStats();
+    }
+    mscLocalCache = cacheBuilder.build();
+    cacheObjName = cacheBuilder.toString();
+  }
+
+  /**
+   * Checks if cache is enabled and initialized
+   *
+   * @return boolean
+   */
+  private boolean isCacheEnabledAndInitialized() {
+    // Do not use the cache if session level query cache is also disabled
+    // Both caches can be used only at compilation time because execution may 
change
+    // DB objects (Tables, Partition metadata objects) and cache entries may 
already invalid
+    SessionState sessionState = SessionState.get();
+    if (sessionState == null || 
sessionState.getQueryCache(MetaStoreClientCacheUtils.getQueryId()) == null) {
+      return false;
+    }
+
+    return INITIALIZED.get();
+  }
+
+  // SG:FIXME, capabilities in CacheKey?
+
+  @Override
+  public Table getTable(String dbname, String name) throws TException {
+    GetTableRequest req = new GetTableRequest(dbname, name);
+    req.setCatName(getDefaultCatalog(conf));
+    return getTable(req);
+  }
+
+  @Override
+  public Table getTable(String dbname, String name, boolean getColumnStats, 
String engine) throws TException {
+    GetTableRequest req = new GetTableRequest(dbname, name);
+    req.setCatName(getDefaultCatalog(conf));
+    req.setGetColumnStats(getColumnStats);
+    if (getColumnStats) {
+      req.setEngine(engine);
+    }
+    return getTable(req);
+  }
+
+  @Override
+  public Table getTable(String catName, String dbName, String tableName) 
throws TException {
+    GetTableRequest req = new GetTableRequest(dbName, tableName);
+    req.setCatName(catName);
+    return getTable(req);
+  }
+
+  @Override
+  public Table getTable(String catName, String dbName, String tableName, 
String validWriteIdList)
+      throws TException {
+    GetTableRequest req = new GetTableRequest(dbName, tableName);
+    req.setCatName(catName);
+    req.setValidWriteIdList(validWriteIdList);
+    return getTable(req);
+  }
+
+  @Override
+  public Table getTable(String catName, String dbName, String tableName, 
String validWriteIdList,
+      boolean getColumnStats, String engine) throws TException {
+    GetTableRequest req = new GetTableRequest(dbName, tableName);
+    req.setCatName(catName);
+    req.setValidWriteIdList(validWriteIdList);
+    req.setGetColumnStats(getColumnStats);
+    if (getColumnStats) {
+      req.setEngine(engine);
+    }
+    return getTable(req);
+  }
+
+  @Override
+  public Table getTable(GetTableRequest req) throws TException {
+    if (isCacheEnabledAndInitialized()) {
+      // table should be transactional to get responses from the cache
+      TableWatermark watermark = new TableWatermark(req.getValidWriteIdList(), 
req.getId());
+      if (watermark.isValid()) {
+        CacheKey cacheKey = new CacheKey(KeyType.TABLE, req);
+        Table r = (Table) mscLocalCache.getIfPresent(cacheKey);
+        if (r == null) {
+          r = getDelegate().getTable(req);

Review Comment:
   Please use this.delegate instead of getDelegate().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to