Repository: kylin Updated Branches: refs/heads/yang-m1 36c76427a -> 7036e4b8f
add cache to hdfs filter Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7036e4b8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7036e4b8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7036e4b8 Branch: refs/heads/yang-m1 Commit: 7036e4b8fb594af75ee7127ae8cbbe46ade32514 Parents: 36c7642 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Apr 11 17:02:52 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Apr 11 17:02:52 2016 +0800 ---------------------------------------------------------------------- .../kylin/common/util/CacheBuilderTest.java | 43 ++++++++++++++++++++ .../cube/v2/filter/MassInValueProviderImpl.java | 40 ++++++++++++++++-- 2 files changed, 79 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7036e4b8/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java b/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java new file mode 100644 index 0000000..e30f0b3 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/util/CacheBuilderTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import org.junit.Test; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class CacheBuilderTest { + @Test + public void foo() { + Cache<Object, Object> build = CacheBuilder.newBuilder().maximumSize(1).weakValues().removalListener(new RemovalListener<Object, Object>() { + @Override + public void onRemoval(RemovalNotification<Object, Object> notification) { + System.out.println(notification.getCause()); + } + }).build(); + + build.put(1, 1); + build.put(1, 2); + build.put(2, 2); + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7036e4b8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java index 83a6671..16157ec 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/filter/MassInValueProviderImpl.java @@ -24,10 +24,12 @@ import java.util.List; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.metadata.filter.UDF.MassInValueProvider; import org.apache.kylin.metadata.filter.function.Functions; @@ -35,11 +37,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Stopwatch; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.Sets; public class MassInValueProviderImpl implements MassInValueProvider { public static final Logger logger = LoggerFactory.getLogger(MassInValueProviderImpl.class); + private final static Cache<String, Pair<Long, Set<ByteArray>>> hdfs_caches = CacheBuilder.newBuilder().maximumSize(10).weakValues().removalListener(new RemovalListener<Object, Object>() { + @Override + public void onRemoval(RemovalNotification<Object, Object> notification) { + logger.debug(String.valueOf(notification.getCause())); + } + }).build(); + private Set<ByteArray> ret = Sets.newHashSet(); public MassInValueProviderImpl(Functions.FilterTableType filterTableType, String filterResourceIdentifier, DimensionEncoding encoding) { @@ -49,20 +62,39 @@ public class MassInValueProviderImpl implements MassInValueProvider { logger.info("Start to load HDFS filter table from " + filterResourceIdentifier); Stopwatch stopwatch = new Stopwatch().start(); - FileSystem fileSystem; + FileSystem fileSystem = null; try { fileSystem = FileSystem.get(HBaseConfiguration.create()); + + long modificationTime = fileSystem.getFileStatus(new Path(filterResourceIdentifier)).getModificationTime(); + Pair<Long, Set<ByteArray>> cached = hdfs_caches.getIfPresent(filterResourceIdentifier); + if (cached != null && cached.getFirst().equals(modificationTime)) { + ret = cached.getSecond(); + logger.info("Load HDFS from cache using " + stopwatch.elapsedMillis() + " millis"); + return; + } + InputStream inputStream = fileSystem.open(new Path(filterResourceIdentifier)); List<String> lines = IOUtils.readLines(inputStream); logger.info("Load HDFS finished after " + stopwatch.elapsedMillis() + " millis"); for (String line : lines) { - ByteArray byteArray = ByteArray.allocate(encoding.getLengthOfEncoding()); - encoding.encode(line.getBytes(), line.getBytes().length, byteArray.array(), 0); - ret.add(byteArray); + if (StringUtils.isEmpty(line)) { + continue; + } + + try { + ByteArray byteArray = ByteArray.allocate(encoding.getLengthOfEncoding()); + encoding.encode(line.getBytes(), line.getBytes().length, byteArray.array(), 0); + ret.add(byteArray); + } catch (Exception e) { + logger.warn("Error when encoding the filter line " + line); + } } + hdfs_caches.put(filterResourceIdentifier, Pair.newPair(modificationTime, ret)); + logger.info("Mass In values constructed after " + stopwatch.elapsedMillis() + " millis, containing " + ret.size() + " entries"); } catch (IOException e) {