Repository: kylin
Updated Branches:
  refs/heads/yang-m1 36c76427a -> 7036e4b8f


add cache to hdfs filter


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7036e4b8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7036e4b8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7036e4b8

Branch: refs/heads/yang-m1
Commit: 7036e4b8fb594af75ee7127ae8cbbe46ade32514
Parents: 36c7642
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Apr 11 17:02:52 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Mon Apr 11 17:02:52 2016 +0800

----------------------------------------------------------------------
 .../kylin/common/util/CacheBuilderTest.java     | 43 ++++++++++++++++++++
 .../cube/v2/filter/MassInValueProviderImpl.java | 40 ++++++++++++++++--
 2 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7036e4b8/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java 
b/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java
new file mode 100644
index 0000000..e30f0b3
--- /dev/null
+++ 
b/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import org.junit.Test;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class CacheBuilderTest {
+    @Test
+    public void foo() {
+        Cache<Object, Object> build = 
CacheBuilder.newBuilder().maximumSize(1).weakValues().removalListener(new 
RemovalListener<Object, Object>() {
+            @Override
+            public void onRemoval(RemovalNotification<Object, Object> 
notification) {
+                System.out.println(notification.getCause());
+            }
+        }).build();
+
+        build.put(1, 1);
+        build.put(1, 2);
+        build.put(2, 2);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7036e4b8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
index 83a6671..16157ec 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
@@ -24,10 +24,12 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.metadata.filter.UDF.MassInValueProvider;
 import org.apache.kylin.metadata.filter.function.Functions;
@@ -35,11 +37,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Sets;
 
 public class MassInValueProviderImpl implements MassInValueProvider {
     public static final Logger logger = 
LoggerFactory.getLogger(MassInValueProviderImpl.class);
 
+    private final static Cache<String, Pair<Long, Set<ByteArray>>> hdfs_caches 
= CacheBuilder.newBuilder().maximumSize(10).weakValues().removalListener(new 
RemovalListener<Object, Object>() {
+        @Override
+        public void onRemoval(RemovalNotification<Object, Object> 
notification) {
+            logger.debug(String.valueOf(notification.getCause()));
+        }
+    }).build();
+
     private Set<ByteArray> ret = Sets.newHashSet();
 
     public MassInValueProviderImpl(Functions.FilterTableType filterTableType, 
String filterResourceIdentifier, DimensionEncoding encoding) {
@@ -49,20 +62,39 @@ public class MassInValueProviderImpl implements 
MassInValueProvider {
             logger.info("Start to load HDFS filter table from " + 
filterResourceIdentifier);
             Stopwatch stopwatch = new Stopwatch().start();
 
-            FileSystem fileSystem;
+            FileSystem fileSystem = null;
             try {
                 fileSystem = FileSystem.get(HBaseConfiguration.create());
+
+                long modificationTime = fileSystem.getFileStatus(new 
Path(filterResourceIdentifier)).getModificationTime();
+                Pair<Long, Set<ByteArray>> cached = 
hdfs_caches.getIfPresent(filterResourceIdentifier);
+                if (cached != null && 
cached.getFirst().equals(modificationTime)) {
+                    ret = cached.getSecond();
+                    logger.info("Load HDFS from cache using " + 
stopwatch.elapsedMillis() + " millis");
+                    return;
+                }
+
                 InputStream inputStream = fileSystem.open(new 
Path(filterResourceIdentifier));
                 List<String> lines = IOUtils.readLines(inputStream);
 
                 logger.info("Load HDFS finished after " + 
stopwatch.elapsedMillis() + " millis");
 
                 for (String line : lines) {
-                    ByteArray byteArray = 
ByteArray.allocate(encoding.getLengthOfEncoding());
-                    encoding.encode(line.getBytes(), line.getBytes().length, 
byteArray.array(), 0);
-                    ret.add(byteArray);
+                    if (StringUtils.isEmpty(line)) {
+                        continue;
+                    }
+
+                    try {
+                        ByteArray byteArray = 
ByteArray.allocate(encoding.getLengthOfEncoding());
+                        encoding.encode(line.getBytes(), 
line.getBytes().length, byteArray.array(), 0);
+                        ret.add(byteArray);
+                    } catch (Exception e) {
+                        logger.warn("Error when encoding the filter line " + 
line);
+                    }
                 }
 
+                hdfs_caches.put(filterResourceIdentifier, 
Pair.newPair(modificationTime, ret));
+
                 logger.info("Mass In values constructed after " + 
stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries");
 
             } catch (IOException e) {

Reply via email to