KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fb740ec8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fb740ec8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fb740ec8 Branch: refs/heads/1.5.x-CDH5.7 Commit: fb740ec889257329028eec2e4c94d69ff32e3258 Parents: 08807fa Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Mar 23 17:07:05 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Tue May 24 14:09:28 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/hadoop/invertedindex/IITest.java | 10 ++- .../apache/kylin/common/KylinConfigBase.java | 5 +- examples/test_case_data/sandbox/hbase-site.xml | 19 +--- .../test_case_data/sandbox/kylin_job_conf.xml | 86 +++++++++--------- examples/test_case_data/sandbox/mapred-site.xml | 23 +++-- .../kylin/provision/BuildCubeWithEngine.java | 54 +++++------ .../kylin/provision/BuildIIWithStream.java | 13 +-- .../hbase/ii/ITInvertedIndexHBaseTest.java | 9 +- pom.xml | 18 ++-- .../kylin/rest/security/AclHBaseStorage.java | 4 +- .../rest/security/MockAclHBaseStorage.java | 8 +- .../apache/kylin/rest/security/MockHTable.java | 95 ++++---------------- .../rest/security/RealAclHBaseStorage.java | 9 +- .../apache/kylin/rest/service/AclService.java | 25 +++--- .../apache/kylin/rest/service/CubeService.java | 38 +++----- .../apache/kylin/rest/service/QueryService.java | 21 +++-- .../apache/kylin/rest/service/UserService.java | 16 ++-- .../kylin/storage/hbase/HBaseConnection.java | 36 ++++---- .../kylin/storage/hbase/HBaseResourceStore.java | 29 +++--- .../storage/hbase/cube/SimpleHBaseStore.java | 20 ++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 4 +- .../hbase/cube/v1/HBaseClientKVIterator.java | 11 +-- .../hbase/cube/v1/RegionScannerAdapter.java | 11 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../observer/AggregationScanner.java | 15 +++- .../observer/ObserverAggregationCache.java | 15 ++-- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 11 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../storage/hbase/ii/IICreateHTableJob.java | 13 +-- .../hbase/ii/InvertedIndexStorageQuery.java | 6 +- .../endpoint/EndpointTupleIterator.java | 13 +-- .../ii/coprocessor/endpoint/IIEndpoint.java | 4 +- .../storage/hbase/steps/CubeHTableUtil.java | 16 ++-- .../storage/hbase/steps/DeprecatedGCStep.java | 24 ++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 24 ++--- .../storage/hbase/util/CleanHtableCLI.java | 13 +-- .../storage/hbase/util/CubeMigrationCLI.java | 38 ++++---- .../hbase/util/CubeMigrationCheckCLI.java | 17 ++-- .../hbase/util/DeployCoprocessorCLI.java | 23 ++--- .../hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 +++---- .../kylin/storage/hbase/util/HBaseClean.java | 18 ++-- .../hbase/util/HBaseRegionSizeCalculator.java | 42 ++++----- .../kylin/storage/hbase/util/HBaseUsage.java | 10 ++- .../storage/hbase/util/HbaseStreamingInput.java | 31 +++---- .../hbase/util/HtableAlterMetadataCLI.java | 9 +- .../storage/hbase/util/OrphanHBaseCleanJob.java | 18 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 16 ++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 31 +++---- .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- 59 files changed, 530 insertions(+), 583 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java index da25143..5d2cfc4 100644 --- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java +++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java @@ -38,6 +38,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.common.util.FIFOIterable; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; @@ -222,6 +223,11 @@ public class IITest extends LocalFileMetadataTestCase { } @Override + public int getBatch() { + return -1; + } + + @Override public boolean nextRaw(List<Cell> result) throws IOException { if (iiRowIterator.hasNext()) { IIRow iiRow = iiRowIterator.next(); @@ -233,7 +239,7 @@ public class IITest extends LocalFileMetadataTestCase { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { throw new NotImplementedException(); } @@ -243,7 +249,7 @@ public class IITest extends LocalFileMetadataTestCase { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 1062749..9090068 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -645,5 +645,8 @@ abstract public class KylinConfigBase implements Serializable { public int getCubeStatsHLLPrecision() { return Integer.parseInt(getOptional("kylin.job.cubing.inmem.sampling.hll.precision", "14")); } - + + public String getPatchedFuzzyRowFilterVersion() { + return this.getOptional("kylin.hbase.filter.fuzzy.row.filter.version", "1.1.3"); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/examples/test_case_data/sandbox/hbase-site.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ <name>zookeeper.znode.parent</name> <value>/hbase-unsecure</value> </property> - <property> - <name>hbase.client.pause</name> - <value>100</value> - <description>General client pause value. Used mostly as value to wait - before running a retry of a failed get, region lookup, etc. - See hbase.client.retries.number for description of how we backoff from - this initial pause amount and how this pause works w/ retries.</description> - </property> - <property> - <name>hbase.client.retries.number</name> - <value>5</value> - <description>Maximum retries. Used as maximum for all retryable - operations such as the getting of a cell's value, starting a row update, - etc. Retry interval is a rough function based on hbase.client.pause. At - first we retry at this interval but then with backoff, we pretty quickly reach - retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup - ramps up. Change this setting and hbase.client.pause to suit your workload.</description> - </property> + </configuration> http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/examples/test_case_data/sandbox/kylin_job_conf.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml index bd947af..6082fa9 100644 --- a/examples/test_case_data/sandbox/kylin_job_conf.xml +++ b/examples/test_case_data/sandbox/kylin_job_conf.xml @@ -1,20 +1,18 @@ <?xml version="1.0"?> <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 +http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. See accompanying LICENSE file. --> + <configuration> <property> @@ -26,44 +24,41 @@ </description> </property> - <property> - <name>mapreduce.map.maxattempts</name> - <value>2</value> - </property> + <!-- uncomment the following 5 properties to enable lzo compressing - <!-- - <property> - <name>mapred.compress.map.output</name> - <value>true</value> - <description>Compress map outputs</description> - </property> + <property> + <name>mapred.compress.map.output</name> + <value>true</value> + <description>Compress map outputs</description> + </property> - <property> - <name>mapred.map.output.compression.codec</name> - <value>org.apache.hadoop.io.compress.SnappyCodec</value> - <description>The compression codec to use for map outputs - </description> - </property> + <property> + <name>mapred.map.output.compression.codec</name> + <value>com.hadoop.compression.lzo.LzoCodec</value> + <description>The compression codec to use for map outputs + </description> + </property> - <property> - <name>mapred.output.compress</name> - <value>true</value> - <description>Compress the output of a MapReduce job</description> - </property> + <property> + <name>mapred.output.compress</name> + <value>true</value> + <description>Compress the output of a MapReduce job</description> + </property> - <property> - <name>mapred.output.compression.codec</name> - <value>org.apache.hadoop.io.compress.SnappyCodec</value> - <description>The compression codec to use for job outputs - </description> - </property> + <property> + <name>mapred.output.compression.codec</name> + <value>com.hadoop.compression.lzo.LzoCodec</value> + <description>The compression codec to use for job outputs + </description> + </property> - <property> - <name>mapred.output.compression.type</name> - <value>BLOCK</value> - <description>The compression type to use for job outputs</description> - </property> ---> + <property> + <name>mapred.output.compression.type</name> + <value>BLOCK</value> + <description>The compression type to use for job outputs</description> + </property> + + !--> <property> <name>mapreduce.job.max.split.locations</name> @@ -76,5 +71,4 @@ <value>2</value> <description>Block replication</description> </property> - </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/examples/test_case_data/sandbox/mapred-site.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml index 18f6feb..ff1c7eb 100644 --- a/examples/test_case_data/sandbox/mapred-site.xml +++ b/examples/test_case_data/sandbox/mapred-site.xml @@ -18,7 +18,7 @@ <property> <name>io.sort.mb</name> - <value>128</value> + <value>64</value> </property> <property> @@ -28,12 +28,12 @@ <property> <name>mapred.job.map.memory.mb</name> - <value>512</value> + <value>250</value> </property> <property> <name>mapred.job.reduce.memory.mb</name> - <value>512</value> + <value>250</value> </property> <property> @@ -58,7 +58,7 @@ <property> <name>mapreduce.application.classpath</name> - <value>/tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/usr/hdp/${hdp.version}/hadoop/lib/snappy-java-1.0.4.1.jar:/etc/hadoop/conf/secure</value> + <value>/tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure</value> </property> <property> @@ -81,10 +81,9 @@ <value>false</value> </property> - <!--the default value on hdp is 0.05, however for test environments we need to be conservative on resource --> <property> <name>mapreduce.job.reduce.slowstart.completedmaps</name> - <value>1</value> + <value>0.05</value> </property> <property> @@ -114,7 +113,7 @@ <property> <name>mapreduce.map.java.opts</name> - <value>-Xmx512m</value> + <value>-Xmx200m</value> </property> <property> @@ -124,7 +123,7 @@ <property> <name>mapreduce.map.memory.mb</name> - <value>512</value> + <value>250</value> </property> <property> @@ -169,7 +168,7 @@ <property> <name>mapreduce.reduce.memory.mb</name> - <value>512</value> + <value>250</value> </property> <property> @@ -219,7 +218,7 @@ <property> <name>mapreduce.task.io.sort.mb</name> - <value>128</value> + <value>64</value> </property> <property> @@ -234,7 +233,7 @@ <property> <name>yarn.app.mapreduce.am.command-opts</name> - <value>-Xmx512m</value> + <value>-Xmx200m</value> </property> <property> @@ -244,7 +243,7 @@ <property> <name>yarn.app.mapreduce.am.resource.mb</name> - <value>512</value> + <value>250</value> </property> <property> http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 07de5d7..a83a754 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -37,8 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; @@ -60,6 +59,7 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; @@ -404,34 +404,34 @@ public class BuildCubeWithEngine { } private void checkHFilesInHBase(CubeSegment segment) throws IOException { - Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); String tableName = segment.getStorageLocationIdentifier(); - try (HTable table = new HTable(conf, tableName)) { - HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); - Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); - long totalSize = 0; - for (Long size : sizeMap.values()) { - totalSize += size; - } - if (totalSize == 0) { - return; - } - Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap(); - // check if there's region contains more than one hfile, which means the hfile config take effects - boolean hasMultiHFileRegions = false; - for (Pair<Integer, Integer> count : countMap.values()) { - // check if hfile count is greater than store count - if (count.getSecond() > count.getFirst()) { - hasMultiHFileRegions = true; - break; - } - } - if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) { - throw new IOException("hfile size set to 0, but found region contains more than one hfiles"); - } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) { - throw new IOException("hfile size set greater than 0, but all regions still has only one hfile"); + + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); + Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); + long totalSize = 0; + for (Long size : sizeMap.values()) { + totalSize += size; + } + if (totalSize == 0) { + return; + } + + Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap(); + // check if there's region contains more than one hfile, which means the hfile config take effects + boolean hasMultiHFileRegions = false; + for (Pair<Integer, Integer> count : countMap.values()) { + // check if hfile count is greater than store count + if (count.getSecond() > count.getFirst()) { + hasMultiHFileRegions = true; + break; } } + if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) { + throw new IOException("hfile size set to 0, but found region contains more than one hfiles"); + } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) { + throw new IOException("hfile size set greater than 0, but all regions still has only one hfile"); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java index c0f6f63..a74ecb9 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java @@ -45,7 +45,8 @@ import java.util.TimeZone; import java.util.UUID; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.util.ToolRunner; @@ -208,7 +209,7 @@ public class BuildIIWithStream { } } final IISegment segment = createSegment(iiName); - final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier()); + final Table htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(segment.getStorageLocationIdentifier())); String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; ToolRunner.run(new IICreateHTableJob(), args); @@ -246,7 +247,7 @@ public class BuildIIWithStream { } } - private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { + private void build(SliceBuilder sliceBuilder, StreamingBatch batch, Table htable) throws IOException { final Slice slice = sliceBuilder.buildSlice(batch); try { loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); @@ -255,17 +256,17 @@ public class BuildIIWithStream { } } - private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { + private void loadToHBase(Table hTable, Slice slice, IIKeyValueCodec codec) throws IOException { List<Put> data = Lists.newArrayList(); for (IIRow row : codec.encodeKeyValue(slice)) { final byte[] key = row.getKey().get(); final byte[] value = row.getValue().get(); Put put = new Put(key); - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); + put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); final ImmutableBytesWritable dictionary = row.getDictionary(); final byte[] dictBytes = dictionary.get(); if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) { - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); + put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); } else { throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java index b5be703..5a591a1 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java @@ -20,9 +20,7 @@ package org.apache.kylin.storage.hbase.ii; import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; @@ -49,7 +47,7 @@ public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase { IIInstance ii; IISegment seg; - HConnection hconn; + Connection hconn; TableRecordInfo info; @@ -60,8 +58,7 @@ public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase { this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); this.seg = ii.getFirstSegment(); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - hconn = HConnectionManager.createConnection(hconf); + hconn = HBaseConnection.get(getTestConfig().getStorageUrl()); this.info = new TableRecordInfo(seg); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 12d9eb2..df29c0e 100644 --- a/pom.xml +++ b/pom.xml @@ -47,12 +47,12 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- Hadoop versions --> - <hadoop2.version>2.6.0</hadoop2.version> - <yarn.version>2.6.0</yarn.version> + <hadoop2.version>2.7.1</hadoop2.version> + <yarn.version>2.7.1</yarn.version> <zookeeper.version>3.4.6</zookeeper.version> - <hive.version>0.14.0</hive.version> - <hive-hcatalog.version>0.14.0</hive-hcatalog.version> - <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version> + <hive.version>1.2.1</hive.version> + <hive-hcatalog.version>1.2.1</hive-hcatalog.version> + <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version> <kafka.version>0.8.1</kafka.version> <!-- Dependency versions --> @@ -64,6 +64,7 @@ <!-- Commons --> <commons-cli.version>1.2</commons-cli.version> + <commons-codec.version>1.4</commons-codec.version> <commons-lang.version>2.6</commons-lang.version> <commons-lang3.version>3.1</commons-lang3.version> <commons-collections.version>3.2.1</commons-collections.version> @@ -103,7 +104,7 @@ <calcite.version>1.6.0</calcite.version> <!-- Curator.version Version --> - <curator.version>2.6.0</curator.version> + <curator.version>2.7.1</curator.version> <!-- Sonar --> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> @@ -324,6 +325,11 @@ <version>${commons-cli.version}</version> </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>${commons-codec.version}</version> + </dependency> + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons-lang.version}</version> http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java index 2220113..0e71cd5 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java +++ b/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java @@ -20,7 +20,7 @@ package org.apache.kylin.rest.security; import java.io.IOException; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; /** */ @@ -37,6 +37,6 @@ public interface AclHBaseStorage { String prepareHBaseTable(Class clazz) throws IOException; - HTableInterface getTable(String tableName) throws IOException; + Table getTable(String tableName) throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java index 193f5a8..933c49d 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java +++ b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java @@ -21,7 +21,7 @@ package org.apache.kylin.rest.security; import java.io.IOException; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.rest.service.AclService; import org.apache.kylin.rest.service.UserService; @@ -29,8 +29,8 @@ import org.apache.kylin.rest.service.UserService; */ public class MockAclHBaseStorage implements AclHBaseStorage { - private HTableInterface mockedAclTable; - private HTableInterface mockedUserTable; + private Table mockedAclTable; + private Table mockedUserTable; private static final String aclTableName = "MOCK-ACL-TABLE"; private static final String userTableName = "MOCK-USER-TABLE"; @@ -49,7 +49,7 @@ public class MockAclHBaseStorage implements AclHBaseStorage { } @Override - public HTableInterface getTable(String tableName) throws IOException { + public Table getTable(String tableName) throws IOException { if (StringUtils.equals(tableName, aclTableName)) { return mockedAclTable; } else if (StringUtils.equals(tableName, userTableName)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java index d0aa0ed..972eea9 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java +++ b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -91,7 +91,7 @@ import com.google.protobuf.ServiceException; * <li>remove some methods for loading data, checking values ...</li> * </ul> */ -public class MockHTable implements HTableInterface { +public class MockHTable implements Table { private final String tableName; private final List<String> columnFamilies = new ArrayList<>(); @@ -114,14 +114,6 @@ public class MockHTable implements HTableInterface { this.columnFamilies.add(columnFamily); } - /** - * {@inheritDoc} - */ - @Override - public byte[] getTableName() { - return tableName.getBytes(); - } - @Override public TableName getName() { return null; @@ -200,8 +192,8 @@ public class MockHTable implements HTableInterface { } @Override - public Boolean[] exists(List<Get> gets) throws IOException { - return new Boolean[0]; + public boolean[] existsAll(List<Get> list) throws IOException { + return new boolean[0]; } /** @@ -306,15 +298,6 @@ public class MockHTable implements HTableInterface { * {@inheritDoc} */ @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - // FIXME: implement - return null; - } - - /** - * {@inheritDoc} - */ - @Override public ResultScanner getScanner(Scan scan) throws IOException { final List<Result> ret = new ArrayList<Result>(); byte[] st = scan.getStartRow(); @@ -446,7 +429,7 @@ public class MockHTable implements HTableInterface { */ } if (filter.hasFilterRow() && !filteredOnRowKey) { - filter.filterRow(nkvs); + filter.filterRow(); } if (filter.filterRow() || filteredOnRowKey) { nkvs.clear(); @@ -535,6 +518,11 @@ public class MockHTable implements HTableInterface { return false; } + @Override + public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException { + return false; + } + /** * {@inheritDoc} */ @@ -555,7 +543,7 @@ public class MockHTable implements HTableInterface { continue; } for (KeyValue kv : delete.getFamilyMap().get(family)) { - if (kv.isDeleteFamily()) { + if (kv.isDelete()) { data.get(row).get(kv.getFamily()).clear(); } else { data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); @@ -592,6 +580,11 @@ public class MockHTable implements HTableInterface { return false; } + @Override + public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException { + return false; + } + /** * {@inheritDoc} */ @@ -605,7 +598,7 @@ public class MockHTable implements HTableInterface { */ @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { - return incrementColumnValue(row, family, qualifier, amount, true); + return incrementColumnValue(row, family, qualifier, amount, null); } @Override @@ -617,37 +610,6 @@ public class MockHTable implements HTableInterface { * {@inheritDoc} */ @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException { - if (check(row, family, qualifier, null)) { - Put put = new Put(row); - put.add(family, qualifier, Bytes.toBytes(amount)); - put(put); - return amount; - } - long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount; - data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue)); - return newValue; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isAutoFlush() { - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public void flushCommits() throws IOException { - } - - /** - * {@inheritDoc} - */ - @Override public void close() throws IOException { } @@ -673,29 +635,6 @@ public class MockHTable implements HTableInterface { * {@inheritDoc} */ @Override - public void setAutoFlush(boolean autoFlush) { - throw new NotImplementedException(); - - } - - /** - * {@inheritDoc} - */ - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - throw new NotImplementedException(); - - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - throw new NotImplementedException(); - } - - /** - * {@inheritDoc} - */ - @Override public long getWriteBufferSize() { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java index 5a48e83..d40bdf3 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java +++ b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java @@ -21,7 +21,8 @@ package org.apache.kylin.rest.security; import java.io.IOException; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.rest.service.AclService; import org.apache.kylin.rest.service.UserService; @@ -56,11 +57,11 @@ public class RealAclHBaseStorage implements AclHBaseStorage { } @Override - public HTableInterface getTable(String tableName) throws IOException { + public Table getTable(String tableName) throws IOException { if (StringUtils.equals(tableName, aclTableName)) { - return HBaseConnection.get(hbaseUrl).getTable(aclTableName); + return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); } else if (StringUtils.equals(tableName, userTableName)) { - return HBaseConnection.get(hbaseUrl).getTable(userTableName); + return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); } else { throw new IllegalStateException("getTable failed" + tableName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/service/AclService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java index 58e093c..a03ff5e 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java @@ -33,7 +33,7 @@ import javax.annotation.PostConstruct; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -124,7 +124,7 @@ public class AclService implements MutableAclService { @Override public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) { List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>(); - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -173,7 +173,7 @@ public class AclService implements MutableAclService { @Override public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException { Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>(); - HTableInterface htable = null; + Table htable = null; Result result = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -225,17 +225,16 @@ public class AclService implements MutableAclService { Authentication auth = SecurityContextHolder.getContext().getAuthentication(); PrincipalSid sid = new PrincipalSid(auth); - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " created successfully."); } catch (IOException e) { @@ -249,7 +248,7 @@ public class AclService implements MutableAclService { @Override public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -265,7 +264,6 @@ public class AclService implements MutableAclService { } htable.delete(delete); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " deleted successfully."); } catch (IOException e) { @@ -283,7 +281,7 @@ public class AclService implements MutableAclService { throw e; } - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -294,17 +292,16 @@ public class AclService implements MutableAclService { Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); if (null != acl.getParentAcl()) { - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); } for (AccessControlEntry ace : acl.getEntries()) { AceInfo aceInfo = new AceInfo(ace); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); } if (!put.isEmpty()) { htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully."); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index 8ed0802..6c47fde 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -20,10 +20,7 @@ package org.apache.kylin.rest.service; import java.io.IOException; import java.util.*; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.mapred.Merger; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -431,35 +428,24 @@ public class CubeService extends BasicService { if (htableInfoCache.containsKey(tableName)) { return htableInfoCache.get(tableName); } - - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - HTable table = null; + Connection conn = HBaseConnection.get(this.getConfig().getStorageUrl()); HBaseResponse hr = null; long tableSize = 0; int regionCount = 0; - try { - table = new HTable(hconf, tableName); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); + Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); - HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); - Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); - - for (long s : sizeMap.values()) { - tableSize += s; - } - - regionCount = sizeMap.size(); - - // Set response. - hr = new HBaseResponse(); - hr.setTableSize(tableSize); - hr.setRegionCount(regionCount); - } finally { - if (null != table) { - table.close(); - } + for (long s : sizeMap.values()) { + tableSize += s; } + regionCount = sizeMap.size(); + + // Set response. + hr = new HBaseResponse(); + hr.setTableSize(tableSize); + hr.setRegionCount(regionCount); htableInfoCache.put(tableName, hr); return hr; http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index 08b338c..6691784 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -44,8 +44,9 @@ import javax.sql.DataSource; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.KylinConfig; @@ -130,14 +131,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -163,14 +163,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -182,9 +181,9 @@ public class QueryService extends BasicService { } List<Query> queries = new ArrayList<Query>(); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Get get = new Get(Bytes.toBytes(creator)); get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY)); Result result = htable.get(get); http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/server/src/main/java/org/apache/kylin/rest/service/UserService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java index e4e2de3..624c49c 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java @@ -29,7 +29,7 @@ import javax.annotation.PostConstruct; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -69,7 +69,7 @@ public class UserService implements UserManager { @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(userTableName); @@ -100,16 +100,15 @@ public class UserService implements UserManager { @Override public void updateUser(UserDetails user) { - HTableInterface htable = null; + Table htable = null; try { byte[] userAuthorities = serialize(user.getAuthorities()); htable = aclHBaseStorage.getTable(userTableName); Put put = new Put(Bytes.toBytes(user.getUsername())); - put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities); + put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities); htable.put(put); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -119,14 +118,13 @@ public class UserService implements UserManager { @Override public void deleteUser(String username) { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(userTableName); Delete delete = new Delete(Bytes.toBytes(username)); htable.delete(delete); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -141,7 +139,7 @@ public class UserService implements UserManager { @Override public boolean userExists(String username) { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(userTableName); @@ -161,7 +159,7 @@ public class UserService implements UserManager { s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN)); List<String> authorities = new ArrayList<String>(); - HTableInterface htable = null; + Table htable = null; ResultScanner scanner = null; try { htable = aclHBaseStorage.getTable(userTableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index e2f4f3a..a7ffa0d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -32,9 +32,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.engine.mr.HadoopUtil; @@ -52,13 +52,13 @@ public class HBaseConnection { private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>(); - private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>(); + private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>(); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (HConnection conn : ConnPool.values()) { + for (Connection conn : ConnPool.values()) { try { conn.close(); } catch (IOException e) { @@ -124,9 +124,9 @@ public class HBaseConnection { // ============================================================================ - // returned HConnection can be shared by multiple threads and does not require close() + // returned Connection can be shared by multiple threads and does not require close() @SuppressWarnings("resource") - public static HConnection get(String url) { + public static Connection get(String url) { // find configuration Configuration conf = ConfigCache.get(url); if (conf == null) { @@ -134,13 +134,13 @@ public class HBaseConnection { ConfigCache.put(url, conf); } - HConnection connection = ConnPool.get(url); + Connection connection = ConnPool.get(url); try { while (true) { // I don't use DCL since recreate a connection is not a big issue. if (connection == null || connection.isClosed()) { logger.info("connection is null or closed, creating a new one"); - connection = HConnectionManager.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); ConnPool.put(url, connection); } @@ -159,8 +159,8 @@ public class HBaseConnection { return connection; } - public static boolean tableExists(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + public static boolean tableExists(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { return hbase.tableExists(TableName.valueOf(tableName)); } finally { @@ -180,8 +180,8 @@ public class HBaseConnection { deleteTable(HBaseConnection.get(hbaseUrl), tableName); } - public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException { + Admin hbase = conn.getAdmin(); try { if (tableExists(conn, tableName)) { @@ -210,8 +210,8 @@ public class HBaseConnection { } } - public static void deleteTable(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + public static void deleteTable(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { if (!tableExists(conn, tableName)) { @@ -221,10 +221,10 @@ public class HBaseConnection { logger.debug("delete HTable '" + tableName + "'"); - if (hbase.isTableEnabled(tableName)) { - hbase.disableTable(tableName); + if (hbase.isTableEnabled(TableName.valueOf(tableName))) { + hbase.disableTable(TableName.valueOf(tableName)); } - hbase.deleteTable(tableName); + hbase.deleteTable(TableName.valueOf(tableName)); logger.debug("HTable '" + tableName + "' deleted"); } finally { http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 2262482..07a5586 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -31,10 +31,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -69,7 +70,7 @@ public class HBaseResourceStore extends ResourceStore { final String tableNameBase; final String hbaseUrl; - private HConnection getConnection() throws IOException { + private Connection getConnection() throws IOException { return HBaseConnection.get(hbaseUrl); } @@ -120,7 +121,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] endRow = Bytes.toBytes(lookForPrefix); endRow[endRow.length - 1]++; - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); Scan scan = new Scan(startRow, endRow); if ((filter != null && filter instanceof KeyOnlyFilter) == false) { scan.addColumn(B_FAMILY, B_COLUMN_TS); @@ -238,13 +239,12 @@ public class HBaseResourceStore extends ResourceStore { IOUtils.copy(content, bout); bout.close(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); table.put(put); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -252,7 +252,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -265,8 +265,6 @@ public class HBaseResourceStore extends ResourceStore { throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); } - table.flushCommits(); - return newTS; } finally { IOUtils.closeQuietly(table); @@ -275,11 +273,10 @@ public class HBaseResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -304,7 +301,7 @@ public class HBaseResourceStore extends ResourceStore { get.addColumn(B_FAMILY, B_COLUMN_TS); } - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { Result result = table.get(get); boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists())); @@ -314,7 +311,7 @@ public class HBaseResourceStore extends ResourceStore { } } - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); @@ -340,7 +337,7 @@ public class HBaseResourceStore extends ResourceStore { return redirectPath; } - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); if (content.length > kvSizeLimit) { writeLargeCellToHdfs(resPath, content, table); @@ -348,8 +345,8 @@ public class HBaseResourceStore extends ResourceStore { } Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + put.addColumn(B_FAMILY, B_COLUMN, content); + put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); return put; } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java index bbdb542..81f205e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java @@ -26,12 +26,13 @@ import java.util.NoSuchElementException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.kv.RowConstants; @@ -86,14 +87,13 @@ public class SimpleHBaseStore implements IGTStore { } private class Writer implements IGTWriter { - final HTableInterface table; + final BufferedMutator table; final ByteBuffer rowkey = ByteBuffer.allocate(50); final ByteBuffer value = ByteBuffer.allocate(50); Writer() throws IOException { - HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - table = conn.getTable(htableName); - table.setAutoFlush(false, true); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + table = conn.getBufferedMutator(htableName); } @Override @@ -113,24 +113,24 @@ public class SimpleHBaseStore implements IGTStore { Put put = new Put(rowkey); put.addImmutable(CF_B, ByteBuffer.wrap(COL_B), HConstants.LATEST_TIMESTAMP, value); - table.put(put); + table.mutate(put); } @Override public void close() throws IOException { - table.flushCommits(); + table.flush(); table.close(); } } class Reader implements IGTScanner { - final HTableInterface table; + final Table table; final ResultScanner scanner; int count = 0; Reader() throws IOException { - HConnection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); table = conn.getTable(htableName); Scan scan = new Scan(); http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index 9eb05d2..7104da8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -25,8 +25,9 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -70,7 +71,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { protected final List<RowValueDecoder> rowValueDecoders; private final StorageContext context; private final String tableName; - private final HTableInterface table; + private final Table table; protected CubeTupleConverter tupleConverter; protected final Iterator<HBaseKeyRange> rangeIterator; @@ -88,7 +89,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private int advMeasureRowsRemaining; private int advMeasureRowIndex; - public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, // + public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, Connection conn, // Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, // List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) { this.cubeSeg = cubeSeg; @@ -108,7 +109,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { this.rangeIterator = keyRanges.iterator(); try { - this.table = conn.getTable(tableName); + this.table = conn.getTable(TableName.valueOf(tableName)); } catch (Throwable t) { throw new StorageException("Error when open connection to table " + tableName, t); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 47e94de..7211ec1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -33,7 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; @@ -147,7 +147,7 @@ public class CubeStorageQuery implements IStorageQuery { setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); - HConnection conn = HBaseConnection.get(context.getConnUrl()); + Connection conn = HBaseConnection.get(context.getConnUrl()); // notice we're passing filterD down to storage instead of flatFilter return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo); http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java index 8aace22..aef3963 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java @@ -24,8 +24,9 @@ import java.util.Iterator; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -40,14 +41,14 @@ public class HBaseClientKVIterator implements Iterable<IIRow>, Closeable { byte[] family; - HTableInterface table; + Table table; ResultScanner scanner; Iterator<Result> iterator; - public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family) throws IOException { + public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family) throws IOException { this.family = family; - this.table = hconn.getTable(tableName); + this.table = hconn.getTable(TableName.valueOf(tableName)); this.scanner = table.getScanner(family); this.iterator = scanner.iterator(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java index 6342c5c..ae442fe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java @@ -23,9 +23,11 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; /** * @author yangli9 @@ -50,7 +52,7 @@ public class RegionScannerAdapter implements RegionScanner { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -60,7 +62,7 @@ public class RegionScannerAdapter implements RegionScanner { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -94,4 +96,9 @@ public class RegionScannerAdapter implements RegionScanner { return Long.MAX_VALUE; } + @Override + public int getBatch() { + return -1; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java index e8dd5b9..d033c77 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.filter.TupleFilter; @@ -57,7 +57,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { private int scanCount; private ITuple next; - public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, // + public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, // Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, // StorageContext context, TupleInfo returnTupleInfo) { http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java index c7b650a..8dba1b1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; @@ -99,7 +99,7 @@ public class AggregateRegionObserver extends BaseRegionObserver { // start/end region operation & sync on scanner is suggested by the // javadoc of RegionScanner.nextRaw() // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb - HRegion region = ctxt.getEnvironment().getRegion(); + Region region = ctxt.getEnvironment().getRegion(); region.startRegionOperation(); try { synchronized (innerScanner) { http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java index d8b61b3..635a32f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java @@ -24,7 +24,9 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; @@ -116,8 +118,8 @@ public class AggregationScanner implements RegionScanner { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { - return outerScanner.next(result, limit); + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return outerScanner.next(result, scannerContext); } @Override @@ -126,8 +128,8 @@ public class AggregationScanner implements RegionScanner { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return outerScanner.nextRaw(result, limit); + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + return outerScanner.nextRaw(result, scannerContext); } @Override @@ -160,6 +162,11 @@ public class AggregationScanner implements RegionScanner { return outerScanner.getMvccReadPoint(); } + @Override + public int getBatch() { + return outerScanner.getBatch(); + } + private static class Stats { long inputRows = 0; long inputBytes = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java index 8404262..1809a44 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java @@ -24,12 +24,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache; @@ -112,7 +110,7 @@ public class ObserverAggregationCache extends AggregationCache { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -122,7 +120,7 @@ public class ObserverAggregationCache extends AggregationCache { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -161,6 +159,11 @@ public class ObserverAggregationCache extends AggregationCache { // AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()"); return Long.MAX_VALUE; } + + @Override + public int getBatch() { + return innerScanner.getBatch(); + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/fb740ec8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java index f0e9bed..c69fd8b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java @@ -23,7 +23,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -60,7 +60,7 @@ public class ObserverEnabler { static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap(); public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, // - Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException { + Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException { if (context.isCoprocessorEnabled() == false) { return table.getScanner(scan);