Repository: kylin Updated Branches: refs/heads/master 4d16e4d0e -> 737ba33b6
revise hdfs filter cache Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/737ba33b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/737ba33b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/737ba33b Branch: refs/heads/master Commit: 737ba33b6be4e69ab6533c03e1bb3ba5aeb776a4 Parents: 4d16e4d Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Apr 11 18:07:20 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Apr 11 18:08:13 2016 +0800 ---------------------------------------------------------------------- .../cube/v2/filter/MassInValueProviderImpl.java | 57 ++++++++++---------- 1 file changed, 30 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/737ba33b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java index 16157ec..525645d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java @@ -46,7 +46,7 @@ import com.google.common.collect.Sets; public class MassInValueProviderImpl implements MassInValueProvider { public static final Logger logger = LoggerFactory.getLogger(MassInValueProviderImpl.class); - private final static Cache<String, Pair<Long, Set<ByteArray>>> hdfs_caches = CacheBuilder.newBuilder().maximumSize(10).weakValues().removalListener(new RemovalListener<Object, Object>() { + private final static Cache<String, Pair<Long, Set<ByteArray>>> hdfs_caches = CacheBuilder.newBuilder().maximumSize(3).removalListener(new RemovalListener<Object, Object>() { @Override public void onRemoval(RemovalNotification<Object, Object> notification) { logger.debug(String.valueOf(notification.getCause())); @@ -64,39 +64,42 @@ public class MassInValueProviderImpl implements MassInValueProvider { FileSystem fileSystem = null; try { - fileSystem = FileSystem.get(HBaseConfiguration.create()); - - long modificationTime = fileSystem.getFileStatus(new Path(filterResourceIdentifier)).getModificationTime(); - Pair<Long, Set<ByteArray>> cached = hdfs_caches.getIfPresent(filterResourceIdentifier); - if (cached != null && cached.getFirst().equals(modificationTime)) { - ret = cached.getSecond(); - logger.info("Load HDFS from cache using " + stopwatch.elapsedMillis() + " millis"); - return; - } + synchronized (hdfs_caches) { + + fileSystem = FileSystem.get(HBaseConfiguration.create()); + + long modificationTime = fileSystem.getFileStatus(new Path(filterResourceIdentifier)).getModificationTime(); + Pair<Long, Set<ByteArray>> cached = hdfs_caches.getIfPresent(filterResourceIdentifier); + if (cached != null && cached.getFirst().equals(modificationTime)) { + ret = cached.getSecond(); + logger.info("Load HDFS from cache using " + stopwatch.elapsedMillis() + " millis"); + return; + } - InputStream inputStream = fileSystem.open(new Path(filterResourceIdentifier)); - List<String> lines = IOUtils.readLines(inputStream); + InputStream inputStream = fileSystem.open(new Path(filterResourceIdentifier)); + List<String> lines = IOUtils.readLines(inputStream); - logger.info("Load HDFS finished after " + stopwatch.elapsedMillis() + " millis"); + logger.info("Load HDFS finished after " + stopwatch.elapsedMillis() + " millis"); - for (String line : lines) { - if (StringUtils.isEmpty(line)) { - continue; - } + for (String line : lines) { + if (StringUtils.isEmpty(line)) { + continue; + } - try { - ByteArray byteArray = ByteArray.allocate(encoding.getLengthOfEncoding()); - encoding.encode(line.getBytes(), line.getBytes().length, byteArray.array(), 0); - ret.add(byteArray); - } catch (Exception e) { - logger.warn("Error when encoding the filter line " + line); + try { + ByteArray byteArray = ByteArray.allocate(encoding.getLengthOfEncoding()); + encoding.encode(line.getBytes(), line.getBytes().length, byteArray.array(), 0); + ret.add(byteArray); + } catch (Exception e) { + logger.warn("Error when encoding the filter line " + line); + } } - } - - hdfs_caches.put(filterResourceIdentifier, Pair.newPair(modificationTime, ret)); - logger.info("Mass In values constructed after " + stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries"); + hdfs_caches.put(filterResourceIdentifier, Pair.newPair(modificationTime, ret)); + logger.info("Mass In values constructed after " + stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries"); + } + } catch (IOException e) { throw new RuntimeException("error when loading the mass in values", e); }