Repository: kylin
Updated Branches:
  refs/heads/master 4d16e4d0e -> 737ba33b6


revise hdfs filter cache


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/737ba33b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/737ba33b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/737ba33b

Branch: refs/heads/master
Commit: 737ba33b6be4e69ab6533c03e1bb3ba5aeb776a4
Parents: 4d16e4d
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Apr 11 18:07:20 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Mon Apr 11 18:08:13 2016 +0800

----------------------------------------------------------------------
 .../cube/v2/filter/MassInValueProviderImpl.java | 57 ++++++++++----------
 1 file changed, 30 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/737ba33b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
index 16157ec..525645d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java
@@ -46,7 +46,7 @@ import com.google.common.collect.Sets;
 public class MassInValueProviderImpl implements MassInValueProvider {
     public static final Logger logger = 
LoggerFactory.getLogger(MassInValueProviderImpl.class);
 
-    private final static Cache<String, Pair<Long, Set<ByteArray>>> hdfs_caches 
= CacheBuilder.newBuilder().maximumSize(10).weakValues().removalListener(new 
RemovalListener<Object, Object>() {
+    private final static Cache<String, Pair<Long, Set<ByteArray>>> hdfs_caches 
= CacheBuilder.newBuilder().maximumSize(3).removalListener(new 
RemovalListener<Object, Object>() {
         @Override
         public void onRemoval(RemovalNotification<Object, Object> 
notification) {
             logger.debug(String.valueOf(notification.getCause()));
@@ -64,39 +64,42 @@ public class MassInValueProviderImpl implements 
MassInValueProvider {
 
             FileSystem fileSystem = null;
             try {
-                fileSystem = FileSystem.get(HBaseConfiguration.create());
-
-                long modificationTime = fileSystem.getFileStatus(new 
Path(filterResourceIdentifier)).getModificationTime();
-                Pair<Long, Set<ByteArray>> cached = 
hdfs_caches.getIfPresent(filterResourceIdentifier);
-                if (cached != null && 
cached.getFirst().equals(modificationTime)) {
-                    ret = cached.getSecond();
-                    logger.info("Load HDFS from cache using " + 
stopwatch.elapsedMillis() + " millis");
-                    return;
-                }
+                synchronized (hdfs_caches) {
+                    
+                    fileSystem = FileSystem.get(HBaseConfiguration.create());
+
+                    long modificationTime = fileSystem.getFileStatus(new 
Path(filterResourceIdentifier)).getModificationTime();
+                    Pair<Long, Set<ByteArray>> cached = 
hdfs_caches.getIfPresent(filterResourceIdentifier);
+                    if (cached != null && 
cached.getFirst().equals(modificationTime)) {
+                        ret = cached.getSecond();
+                        logger.info("Load HDFS from cache using " + 
stopwatch.elapsedMillis() + " millis");
+                        return;
+                    }
 
-                InputStream inputStream = fileSystem.open(new 
Path(filterResourceIdentifier));
-                List<String> lines = IOUtils.readLines(inputStream);
+                    InputStream inputStream = fileSystem.open(new 
Path(filterResourceIdentifier));
+                    List<String> lines = IOUtils.readLines(inputStream);
 
-                logger.info("Load HDFS finished after " + 
stopwatch.elapsedMillis() + " millis");
+                    logger.info("Load HDFS finished after " + 
stopwatch.elapsedMillis() + " millis");
 
-                for (String line : lines) {
-                    if (StringUtils.isEmpty(line)) {
-                        continue;
-                    }
+                    for (String line : lines) {
+                        if (StringUtils.isEmpty(line)) {
+                            continue;
+                        }
 
-                    try {
-                        ByteArray byteArray = 
ByteArray.allocate(encoding.getLengthOfEncoding());
-                        encoding.encode(line.getBytes(), 
line.getBytes().length, byteArray.array(), 0);
-                        ret.add(byteArray);
-                    } catch (Exception e) {
-                        logger.warn("Error when encoding the filter line " + 
line);
+                        try {
+                            ByteArray byteArray = 
ByteArray.allocate(encoding.getLengthOfEncoding());
+                            encoding.encode(line.getBytes(), 
line.getBytes().length, byteArray.array(), 0);
+                            ret.add(byteArray);
+                        } catch (Exception e) {
+                            logger.warn("Error when encoding the filter line " 
+ line);
+                        }
                     }
-                }
-
-                hdfs_caches.put(filterResourceIdentifier, 
Pair.newPair(modificationTime, ret));
 
-                logger.info("Mass In values constructed after " + 
stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries");
+                    hdfs_caches.put(filterResourceIdentifier, 
Pair.newPair(modificationTime, ret));
 
+                    logger.info("Mass In values constructed after " + 
stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries");
+                }
+                
             } catch (IOException e) {
                 throw new RuntimeException("error when loading the mass in 
values", e);
             }

Reply via email to