http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java deleted file mode 100644 index 545d058..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.engine.mr.KylinMapper; -import org.apache.kylin.invertedindex.model.IIDesc; - -/** - * @author yangli9 - */ -public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> { - - long timestamp; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.bindCurrentConfiguration(context.getConfiguration()); - - timestamp = System.currentTimeMillis(); - } - - @Override - protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException { - - ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength()); - int totalLength = value.getLength(); - int valueLength = buffer.getInt(); - int dictionaryLength = totalLength - valueLength - 4; - KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), // - IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, // - IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, // - timestamp, Type.Put, // - buffer.array(), buffer.position(), valueLength); - - // write value - context.write(key, kv); - - kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), // - IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, // - IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, // - timestamp, Type.Put, // - buffer.array(), buffer.position() + valueLength, dictionaryLength); - - // write dictionary - context.write(key, kv); - - } - -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java deleted file mode 100644 index 8099276..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; -import org.apache.hadoop.hbase.security.User; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI; - -/** - * @author George Song (ysong1) - */ -public class IICreateHTableJob extends AbstractHadoopJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - HBaseAdmin admin = null; - try { - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_HTABLE_NAME); - parseOptions(options, args); - - String tableName = getOptionValue(OPTION_HTABLE_NAME); - String iiName = getOptionValue(OPTION_II_NAME); - - KylinConfig config = KylinConfig.getInstanceFromEnv(); - IIManager iiManager = IIManager.getInstance(config); - IIInstance ii = iiManager.getII(iiName); - int sharding = ii.getDescriptor().getSharding(); - - Configuration conf = HBaseConfiguration.create(getConf()); - // check if the table already exists - admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - if (admin.isTableEnabled(tableName)) { - logger.info("Table " + tableName + " already exists and is enabled, no need to create."); - return 0; - } else { - logger.error("Table " + tableName + " is disabled, couldn't append data"); - return 1; - } - } - - // table doesn't exist, need to create - - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); - HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY); - cf.setMaxVersions(1); - - String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase(); - - switch (hbaseDefaultCC) { - case "snappy": { - logger.info("hbase will use snappy to compress data"); - cf.setCompressionType(Compression.Algorithm.SNAPPY); - break; - } - case "lzo": { - logger.info("hbase will use lzo to compress data"); - cf.setCompressionType(Compression.Algorithm.LZO); - break; - } - case "gz": - case "gzip": { - logger.info("hbase will use gzip to compress data"); - cf.setCompressionType(Compression.Algorithm.GZ); - break; - } - case "lz4": { - logger.info("hbase will use lz4 to compress data"); - cf.setCompressionType(Compression.Algorithm.LZ4); - break; - } - default: { - logger.info("hbase will not user any compression codec to compress data"); - } - } - - cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); - tableDesc.addFamily(cf); - tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); - tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); - tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); - - if (User.isHBaseSecurityEnabled(conf)) { - // add coprocessor for bulk load - tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); - } - - IIDeployCoprocessorCLI.deployCoprocessor(tableDesc); - - // create table - byte[][] splitKeys = getSplits(sharding); - if (splitKeys.length == 0) - splitKeys = null; - admin.createTable(tableDesc, splitKeys); - if (splitKeys != null) { - for (int i = 0; i < splitKeys.length; i++) { - logger.info("split key " + i + ": " + BytesUtil.toHex(splitKeys[i])); - } - } - logger.info("create hbase table " + tableName + " done."); - - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } finally { - if (admin != null) - admin.close(); - } - } - - //one region for one shard - private byte[][] getSplits(int shard) { - byte[][] result = new byte[shard - 1][]; - for (int i = 1; i < shard; ++i) { - byte[] split = new byte[IIKeyValueCodec.SHARD_LEN]; - BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN); - result[i - 1] = split; - } - return result; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java deleted file mode 100644 index fef9662..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii; - -import java.util.ArrayList; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.IStorageQuery; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointTupleIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class InvertedIndexStorageQuery implements IStorageQuery { - - private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageQuery.class); - - private IISegment seg; - private String uuid; - private EndpointTupleIterator dataIterator; - - public InvertedIndexStorageQuery(IIInstance ii) { - this.seg = ii.getFirstSegment(); - this.uuid = ii.getUuid(); - } - - @Override - public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - String tableName = seg.getStorageLocationIdentifier(); - - //HConnection is cached, so need not be closed - HConnection conn = HBaseConnection.get(context.getConnUrl()); - try { - dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo); - return dataIterator; - } catch (Throwable e) { - logger.error("Error when connecting to II htable " + tableName, e); - throw new IllegalStateException("Error when connecting to II htable " + tableName, e); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java deleted file mode 100644 index 1f024fe..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.util.List; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.TblColRef; - -import it.uniroma3.mat.extendedset.intset.ConciseSet; - -/** - * @author yangli9 - * - * Evaluate a group of records against a filter in batch. - */ -public class BitMapFilterEvaluator { - - /** Provides bitmaps for a record group ranging [0..N-1], where N is the size of the group */ - public static interface BitMapProvider { - - /** return records whose specified column having specified value */ - ConciseSet getBitMap(TblColRef col, Integer startId, Integer endId); - - /** return the size of the group */ - int getRecordCount(); - - /** return the max value ID of a column according to dictionary */ - int getMaxValueId(TblColRef col); - } - - BitMapProvider provider; - - public BitMapFilterEvaluator(BitMapProvider bitMapProvider) { - this.provider = bitMapProvider; - } - - /** - * @param filter - * @return a set of records that match the filter; or null if filter is null or unable to evaluate - */ - public ConciseSet evaluate(TupleFilter filter) { - if (filter == null) - return null; - - if (filter instanceof LogicalTupleFilter) - return evalLogical((LogicalTupleFilter) filter); - - if (filter instanceof CompareTupleFilter) - return evalCompare((CompareTupleFilter) filter); - - if (filter instanceof ConstantTupleFilter) { - if (!filter.evaluate(null, null)) { - return new ConciseSet(); - } - } - - return null; // unable to evaluate - } - - private ConciseSet evalCompare(CompareTupleFilter filter) { - switch (filter.getOperator()) { - case ISNULL: - return evalCompareIsNull(filter); - case ISNOTNULL: - return evalCompareIsNotNull(filter); - case EQ: - return evalCompareEqual(filter); - case NEQ: - return evalCompareNotEqual(filter); - case IN: - return evalCompareIn(filter); - case NOTIN: - return evalCompareNotIn(filter); - case LT: - return evalCompareLT(filter); - case LTE: - return evalCompareLTE(filter); - case GT: - return evalCompareGT(filter); - case GTE: - return evalCompareGTE(filter); - default: - throw new IllegalStateException("Unsupported operator " + filter.getOperator()); - } - } - - private ConciseSet evalCompareLT(CompareTupleFilter filter) { - int id = Dictionary.stringToDictId((String) filter.getFirstValue()); - return collectRange(filter.getColumn(), null, id - 1); - } - - private ConciseSet evalCompareLTE(CompareTupleFilter filter) { - int id = Dictionary.stringToDictId((String) filter.getFirstValue()); - return collectRange(filter.getColumn(), null, id); - } - - private ConciseSet evalCompareGT(CompareTupleFilter filter) { - int id = Dictionary.stringToDictId((String) filter.getFirstValue()); - return collectRange(filter.getColumn(), id + 1, null); - } - - private ConciseSet evalCompareGTE(CompareTupleFilter filter) { - int id = Dictionary.stringToDictId((String) filter.getFirstValue()); - return collectRange(filter.getColumn(), id, null); - } - - private ConciseSet collectRange(TblColRef column, Integer startId, Integer endId) { - return provider.getBitMap(column, startId, endId); - } - - private ConciseSet evalCompareEqual(CompareTupleFilter filter) { - int id = Dictionary.stringToDictId((String) filter.getFirstValue()); - ConciseSet bitMap = provider.getBitMap(filter.getColumn(), id, id); - if (bitMap == null) - return null; - return bitMap.clone(); // NOTE the clone() to void messing provider's cache - } - - private ConciseSet evalCompareNotEqual(CompareTupleFilter filter) { - ConciseSet set = evalCompareEqual(filter); - not(set); - dropNull(set, filter); - return set; - } - - private ConciseSet evalCompareIn(CompareTupleFilter filter) { - ConciseSet set = new ConciseSet(); - for (Object value : filter.getValues()) { - int id = Dictionary.stringToDictId((String) value); - ConciseSet bitMap = provider.getBitMap(filter.getColumn(), id, id); - if (bitMap == null) - return null; - set.addAll(bitMap); - } - return set; - } - - private ConciseSet evalCompareNotIn(CompareTupleFilter filter) { - ConciseSet set = evalCompareIn(filter); - not(set); - dropNull(set, filter); - return set; - } - - private void dropNull(ConciseSet set, CompareTupleFilter filter) { - if (set == null) - return; - - ConciseSet nullSet = evalCompareIsNull(filter); - set.removeAll(nullSet); - } - - private ConciseSet evalCompareIsNull(CompareTupleFilter filter) { - ConciseSet bitMap = provider.getBitMap(filter.getColumn(), null, null); - if (bitMap == null) - return null; - return bitMap.clone(); // NOTE the clone() to void messing provider's cache - } - - private ConciseSet evalCompareIsNotNull(CompareTupleFilter filter) { - ConciseSet set = evalCompareIsNull(filter); - not(set); - return set; - } - - private ConciseSet evalLogical(LogicalTupleFilter filter) { - List<? extends TupleFilter> children = filter.getChildren(); - - switch (filter.getOperator()) { - case AND: - return evalLogicalAnd(children); - case OR: - return evalLogicalOr(children); - case NOT: - return evalLogicalNot(children); - default: - throw new IllegalStateException("Unsupported operator " + filter.getOperator()); - } - } - - private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) { - ConciseSet set = new ConciseSet(); - not(set); - - for (TupleFilter c : children) { - ConciseSet t = evaluate(c); - if (t == null) - continue; // because it's AND - - set.retainAll(t); - } - return set; - } - - private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) { - ConciseSet set = new ConciseSet(); - - for (TupleFilter c : children) { - ConciseSet t = evaluate(c); - if (t == null) - return null; // because it's OR - - set.addAll(t); - } - return set; - } - - private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) { - ConciseSet set = evaluate(children.get(0)); - not(set); - return set; - } - - private void not(ConciseSet set) { - if (set == null) - return; - - set.add(provider.getRecordCount()); - set.complement(); - } - - public static void main(String[] args) { - ConciseSet s = new ConciseSet(); - s.add(5); - s.complement(); - System.out.println(s); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java deleted file mode 100644 index 9039165..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.dimension.DimensionEncoding; -import org.apache.kylin.dimension.FixedLenDimEnc; -import org.apache.kylin.dimension.IDimensionEncodingMap; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; - -import com.google.common.collect.Maps; - -/** - */ -public class ClearTextDictionary implements IDimensionEncodingMap { - - private final Map<TblColRef, DimensionEncoding> encMap; - - public ClearTextDictionary(TableRecordInfoDigest digest, CoprocessorRowType coprocessorRowType) { - encMap = Maps.newHashMap(); - for (Entry<TblColRef, Integer> entry : coprocessorRowType.columnIdxMap.entrySet()) { - encMap.put(entry.getKey(), new FixedLenDimEnc(digest.length(entry.getValue()))); - } - } - - public ClearTextDictionary(TableRecordInfo tableRecordInfo) { - encMap = Maps.newHashMap(); - TableRecordInfoDigest digest = tableRecordInfo.getDigest(); - for (int i = 0; i < tableRecordInfo.getColumns().size(); i++) { - encMap.put(tableRecordInfo.getColumns().get(i), new FixedLenDimEnc(digest.length(i))); - } - } - - @Override - public DimensionEncoding get(TblColRef col) { - return encMap.get(col); - } - - @Override - public Dictionary<String> getDictionary(TblColRef col) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java deleted file mode 100644 index affb284..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; -import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache; - -/** - */ -public class EndpointAggregationCache extends AggregationCache { - - private EndpointAggregators aggregators; - - public EndpointAggregationCache(EndpointAggregators aggregators) { - this.aggregators = aggregators; - } - - @Override - public MeasureAggregator[] createBuffer() { - return this.aggregators.createBuffer(); - } - - public Set<Map.Entry<AggrKey, MeasureAggregator[]>> getAllEntries() { - return aggBufMap.entrySet(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java deleted file mode 100644 index e481272..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.util.BytesSerializer; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.invertedindex.index.RawTableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; -import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec; -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.measure.hllc.HLLCMeasureType; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.datatype.LongMutable; -import org.apache.kylin.metadata.model.FunctionDesc; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * @author honma - */ -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class EndpointAggregators { - - private enum MetricType { - Count, DimensionAsMetric, DistinctCount, Normal - } - - private final static class MetricInfo { - private MetricType type; - private int refIndex = -1; - private int precision = -1; - - public MetricInfo(MetricType type, int refIndex, int precision) { - this.type = type; - this.refIndex = refIndex; - this.precision = precision; - } - - public MetricInfo(MetricType type, int refIndex) { - this.type = type; - this.refIndex = refIndex; - } - - public MetricInfo(MetricType type) { - this.type = type; - } - - } - - private static MetricInfo generateMetricInfo(TableRecordInfo tableInfo, FunctionDesc functionDesc) { - if (functionDesc.isCount()) { - return new MetricInfo(MetricType.Count); - } else if (functionDesc.isDimensionAsMetric()) { - return new MetricInfo(MetricType.DimensionAsMetric); - } else { - int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue()); - Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II"); - if (HLLCMeasureType.isCountDistinct(functionDesc)) { - return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision()); - } else { - return new MetricInfo(MetricType.Normal, index); - } - } - } - - public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) { - final int metricSize = metrics.size(); - String[] funcNames = new String[metricSize]; - String[] dataTypes = new String[metricSize]; - MetricInfo[] metricInfos = new MetricInfo[metricSize]; - for (int i = 0; i < metricSize; i++) { - FunctionDesc functionDesc = metrics.get(i); - - //TODO: what if funcionDesc's type is different from tablDesc? cause scale difference - funcNames[i] = functionDesc.getExpression(); - dataTypes[i] = functionDesc.getReturnType(); - metricInfos[i] = generateMetricInfo(tableInfo, functionDesc); - } - - return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest()); - } - - final String[] funcNames; - final String[] dataTypes; - final MetricInfo[] metricInfos; - - final transient TableRecordInfoDigest tableRecordInfoDigest; - final transient RawTableRecord rawTableRecord; - final transient ImmutableBytesWritable byteBuffer; - final transient HyperLogLogPlusCounter[] hllcs; - final transient FixedLenMeasureCodec[] measureSerializers; - final transient Object[] metricValues; - - final LongMutable ONE = new LongMutable(1); - - private EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) { - this.funcNames = funcNames; - this.dataTypes = dataTypes; - this.metricInfos = metricInfos; - this.tableRecordInfoDigest = tableInfo; - this.rawTableRecord = tableInfo.createTableRecordBytes(); - this.byteBuffer = new ImmutableBytesWritable(); - - this.hllcs = new HyperLogLogPlusCounter[this.metricInfos.length]; - this.metricValues = new Object[funcNames.length]; - this.measureSerializers = new FixedLenMeasureCodec[funcNames.length]; - for (int i = 0; i < this.measureSerializers.length; ++i) { - this.measureSerializers[i] = FixedLenMeasureCodec.get(DataType.getType(dataTypes[i])); - } - } - - public TableRecordInfoDigest getTableRecordInfoDigest() { - return tableRecordInfoDigest; - } - - public boolean isEmpty() { - return !((funcNames != null) && (funcNames.length != 0)); - } - - public MeasureAggregator[] createBuffer() { - MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length]; - for (int i = 0; i < aggrs.length; i++) { - if (metricInfos[i].type == MetricType.DistinctCount) { - aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType(dataTypes[i])); - } else { - //all other fixed length measures can be aggregated as long - aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType("long")); - } - } - return aggrs; - } - - /** - * this method is heavily called at coprocessor side, - * Make sure as little object creation as possible - */ - public void aggregate(MeasureAggregator[] measureAggrs, byte[] row) { - - rawTableRecord.setBytes(row, 0, row.length); - - for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) { - final MetricInfo metricInfo = metricInfos[metricIndex]; - if (metricInfo.type == MetricType.Count) { - measureAggrs[metricIndex].aggregate(ONE); - continue; - } - - if (metricInfo.type == MetricType.DimensionAsMetric) { - continue; - } - - MeasureAggregator aggregator = measureAggrs[metricIndex]; - FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex]; - - //get the raw bytes - rawTableRecord.getValueBytes(metricInfo.refIndex, byteBuffer); - - if (metricInfo.type == MetricType.Normal) { - aggregator.aggregate(measureSerializer.read(byteBuffer.get(), byteBuffer.getOffset())); - } else if (metricInfo.type == MetricType.DistinctCount) { - //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data - HyperLogLogPlusCounter hllc = hllcs[metricIndex]; - if (hllc == null) { - int precision = metricInfo.precision; - hllc = new HyperLogLogPlusCounter(precision); - } - hllc.clear(); - hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength()); - aggregator.aggregate(hllc); - } - } - } - - /** - * @param aggrs - * @param buffer byte buffer to get the metric data - * @return length of metric data - */ - public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer, int offset) { - for (int i = 0; i < funcNames.length; i++) { - metricValues[i] = aggrs[i].getState(); - } - - int metricBytesOffset = offset; - int length = 0; - for (int i = 0; i < measureSerializers.length; i++) { - measureSerializers[i].write(metricValues[i], buffer, metricBytesOffset); - metricBytesOffset += measureSerializers[i].getLength(); - length += measureSerializers[i].getLength(); - } - return length; - } - - public List<Object> deserializeMetricValues(ByteBuffer buffer) { - List<Object> ret = Lists.newArrayList(); - for (int i = 0; i < measureSerializers.length; i++) { - measureSerializers[i].read(buffer); - Object valueString = measureSerializers[i].getValue(); - ret.add(valueString); - } - return ret; - } - - public static byte[] serialize(EndpointAggregators o) { - ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - serializer.serialize(o, buf); - byte[] result = new byte[buf.position()]; - System.arraycopy(buf.array(), 0, result, 0, buf.position()); - return result; - } - - public static EndpointAggregators deserialize(byte[] bytes) { - return serializer.deserialize(ByteBuffer.wrap(bytes)); - } - - private static final BytesSerializer<EndpointAggregators> serializer = new BytesSerializer<EndpointAggregators>() { - - @Override - public void serialize(EndpointAggregators value, ByteBuffer out) { - BytesUtil.writeAsciiStringArray(value.funcNames, out); - BytesUtil.writeAsciiStringArray(value.dataTypes, out); - - BytesUtil.writeVInt(value.metricInfos.length, out); - for (int i = 0; i < value.metricInfos.length; ++i) { - MetricInfo metricInfo = value.metricInfos[i]; - BytesUtil.writeAsciiString(metricInfo.type.toString(), out); - BytesUtil.writeVInt(metricInfo.refIndex, out); - BytesUtil.writeVInt(metricInfo.precision, out); - } - - BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out); - } - - @Override - public EndpointAggregators deserialize(ByteBuffer in) { - - String[] funcNames = BytesUtil.readAsciiStringArray(in); - String[] dataTypes = BytesUtil.readAsciiStringArray(in); - - int metricInfoLength = BytesUtil.readVInt(in); - MetricInfo[] infos = new MetricInfo[metricInfoLength]; - for (int i = 0; i < infos.length; ++i) { - MetricType type = MetricType.valueOf(BytesUtil.readAsciiString(in)); - int refIndex = BytesUtil.readVInt(in); - int presision = BytesUtil.readVInt(in); - infos[i] = new MetricInfo(type, refIndex, presision); - } - - byte[] temp = BytesUtil.readByteArray(in); - TableRecordInfoDigest tableInfo = TableRecordInfoDigest.deserialize(temp); - - return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo); - } - - }; - - public int getMeasureSerializeLength() { - int length = 0; - for (int i = 0; i < this.measureSerializers.length; ++i) { - length += this.measureSerializers[i].getLength(); - } - return length; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java deleted file mode 100644 index 2ae7f35..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author honma - */ -public class EndpointEnabler { - - private static final Logger logger = LoggerFactory.getLogger(EndpointEnabler.class); - - static final String FORCE_COPROCESSOR = "forceEndpoint"; - - public static boolean isCoprocessorBeneficial() { - return Boolean.parseBoolean(getForceCoprocessor()); - } - - public static void forceCoprocessorOn() { - System.setProperty(FORCE_COPROCESSOR, "true"); - } - - public static void forceCoprocessorOff() { - System.setProperty(FORCE_COPROCESSOR, "false"); - } - - public static String getForceCoprocessor() { - return System.getProperty(FORCE_COPROCESSOR); - } - - public static void forceCoprocessorUnset() { - System.clearProperty(FORCE_COPROCESSOR); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java deleted file mode 100644 index 3fdd5b0..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.util.List; - -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITuple; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; - -public class EndpointTupleConverter { - - final TupleInfo tupleInfo; - final List<TblColRef> columns; - final int[] columnTupleIdx; - final int[] aggrTupleIdx; - - public EndpointTupleConverter(List<TblColRef> columns, List<FunctionDesc> aggrMeasures, TupleInfo returnTupleInfo) { - this.tupleInfo = returnTupleInfo; - this.columns = columns; - this.columnTupleIdx = new int[columns.size()]; - this.aggrTupleIdx = new int[aggrMeasures.size()]; - - for (int i = 0; i < columns.size(); i++) { - TblColRef col = columns.get(i); - columnTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - - for (int i = 0; i < aggrMeasures.size(); i++) { - FunctionDesc measure = aggrMeasures.get(i); - int tupleIdx; - if (measure.isDimensionAsMetric()) { - // for dimension playing as metrics, the measure is just a placeholder, the real value comes from columns - tupleIdx = -1; - } else if (measure.needRewrite()) { - // a rewrite metrics is identified by its rewrite field name - String rewriteFieldName = measure.getRewriteFieldName(); - tupleIdx = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; - } else { - // a non-rewrite metrics (i.e. sum) is like a dimension column - TblColRef col = measure.getParameter().getColRefs().get(0); - tupleIdx = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - aggrTupleIdx[i] = tupleIdx; - } - } - - public ITuple makeTuple(TableRecord tableRecord, List<Object> measureValues, Tuple tuple) { - // dimensions and metrics from II table record - for (int i = 0; i < columnTupleIdx.length; i++) { - int tupleIdx = columnTupleIdx[i]; - if (tupleIdx >= 0) { - String value = tableRecord.getValueString(i); - tuple.setDimensionValue(tupleIdx, value); - } - } - - // additional aggregations calculated inside end point (like cube measures) - if (measureValues != null) { - for (int i = 0; i < aggrTupleIdx.length; ++i) { - int tupleIdx = aggrTupleIdx[i]; - if (tupleIdx >= 0) { - Object value = measureValues.get(i); - if (value instanceof String) { - String dataType = tuple.getDataTypeName(tupleIdx); - value = Tuple.convertOptiqCellValue((String) value, dataType); - } - tuple.setMeasureValue(tupleIdx, value); - } - } - } - return tuple; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java deleted file mode 100644 index e197e3e..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.kylin.common.util.CompressionUtils; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.RangeUtil; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.measure.hllc.HLLCMeasureType; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITuple; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.cache.TsConditionExtractor; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; -import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; -import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Range; -import com.google.common.collect.Ranges; -import com.google.common.collect.Sets; -import com.google.protobuf.HBaseZeroCopyByteString; - -/** - */ -public class EndpointTupleIterator implements ITupleIterator { - - private final static Logger logger = LoggerFactory.getLogger(EndpointTupleIterator.class); - - private final IISegment seg; - - private final String factTableName; - private final List<TblColRef> columns; - private final TupleInfo tupleInfo; - private final TableRecordInfo tableRecordInfo; - private final EndpointTupleConverter tupleConverter; - - private final CoprocessorRowType pushedDownRowType; - private final CoprocessorFilter pushedDownFilter; - private final CoprocessorProjector pushedDownProjector; - private final EndpointAggregators pushedDownAggregators; - private final Range<Long> tsRange;//timestamp column condition's interval - - private Iterator<List<IIProtos.IIResponseInternal.IIRow>> regionResponsesIterator = null; - private ITupleIterator tupleIterator = null; - private HTableInterface table = null; - - private TblColRef partitionCol; - private long lastDataTime = -1; - private int rowsInAllMetric = 0; - - public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable { - - String tableName = segment.getStorageLocationIdentifier(); - table = conn.getTable(tableName); - factTableName = segment.getIIDesc().getFactTableName(); - - if (rootFilter == null) { - rootFilter = ConstantTupleFilter.TRUE; - } - - if (groupBy == null) { - groupBy = Sets.newHashSet(); - } - - if (measures == null) { - measures = Lists.newArrayList(); - } - - //this method will change measures - rewriteMeasureParameters(measures, segment.getColumns()); - - this.seg = segment; - this.columns = segment.getColumns(); - - this.tupleInfo = returnTupleInfo; - this.tupleConverter = new EndpointTupleConverter(columns, measures, returnTupleInfo); - this.tableRecordInfo = new TableRecordInfo(this.seg); - - this.pushedDownRowType = CoprocessorRowType.fromTableRecordInfo(tableRecordInfo, this.columns); - this.pushedDownFilter = CoprocessorFilter.fromFilter(new ClearTextDictionary(this.tableRecordInfo), rootFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS); - - for (TblColRef column : this.pushedDownFilter.getInevaluableColumns()) { - groupBy.add(column); - } - - this.pushedDownProjector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, groupBy); - this.pushedDownAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, measures); - - int tsCol = this.tableRecordInfo.getTimestampColumn(); - this.partitionCol = this.columns.get(tsCol); - this.tsRange = TsConditionExtractor.extractTsCondition(this.partitionCol, rootFilter); - - if (this.tsRange == null) { - logger.info("TsRange conflict for endpoint, return empty directly"); - this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR; - } else { - logger.info("The tsRange being pushed is " + RangeUtil.formatTsRange(tsRange)); - } - - IIProtos.IIRequest endpointRequest = prepareRequest(); - Collection<IIProtos.IIResponse> compressedShardResults = getResults(endpointRequest, table); - - //decompress - Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>(); - for (IIProtos.IIResponse input : compressedShardResults) { - byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob()); - try { - byte[] decompressed = CompressionUtils.decompress(compressed); - shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed)); - } catch (Exception e) { - throw new RuntimeException("decompress endpoint response error"); - } - } - - this.lastDataTime = Collections.min(Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, Long>() { - @Nullable - @Override - public Long apply(IIProtos.IIResponseInternal input) { - - IIProtos.IIResponseInternal.Stats status = input.getStats(); - logger.info("Endpoints all returned, stats from shard {}: start moment:{}, finish moment: {}, elapsed ms: {}, scanned slices: {}, latest slice time is {}", // - new Object[] { String.valueOf(status.getMyShard()), // - DateFormat.formatToTimeStr(status.getServiceStartTime()), // - DateFormat.formatToTimeStr(status.getServiceEndTime()), // - String.valueOf(status.getServiceEndTime() - status.getServiceStartTime()), // - String.valueOf(status.getScannedSlices()), DateFormat.formatToTimeStr(status.getLatestDataTime()) }); - - return status.getLatestDataTime(); - } - })); - - this.regionResponsesIterator = Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, List<IIProtos.IIResponseInternal.IIRow>>() { - @Nullable - @Override - public List<IIProtos.IIResponseInternal.IIRow> apply(@Nullable IIProtos.IIResponseInternal input) { - return input.getRowsList(); - } - }).iterator(); - - if (this.regionResponsesIterator.hasNext()) { - this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next()); - } else { - this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR; - } - } - - /** - * measure comes from query engine, does not contain enough information - */ - private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) { - for (FunctionDesc functionDesc : measures) { - if (functionDesc.isCount()) { - functionDesc.setReturnType("bigint"); - } else { - boolean updated = false; - for (TblColRef column : columns) { - if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) { - if (HLLCMeasureType.isCountDistinct(functionDesc)) { - //TODO: default precision might need be configurable - String iiDefaultHLLC = "hllc10"; - functionDesc.setReturnType(iiDefaultHLLC); - } else { - functionDesc.setReturnType(column.getColumnDesc().getType().toString()); - } - functionDesc.getParameter().setColRefs(ImmutableList.of(column)); - updated = true; - break; - } - } - if (!updated) { - throw new RuntimeException("Func " + functionDesc + " is not related to any column in fact table " + factTableName); - } - } - } - } - - @Override - public boolean hasNext() { - while (!this.tupleIterator.hasNext()) { - if (this.regionResponsesIterator.hasNext()) { - this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next()); - } else { - return false; - } - } - return true; - } - - @Override - public ITuple next() { - rowsInAllMetric++; - - if (!hasNext()) { - throw new IllegalStateException("No more ITuple in EndpointTupleIterator"); - } - - ITuple tuple = this.tupleIterator.next(); - return tuple; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - - } - - @Override - public void close() { - IOUtils.closeQuietly(table); - logger.info("Closed after " + rowsInAllMetric + " rows are fetched"); - } - - /** - * tells storage layer cache what time period of data should not be cached. - * for static storage like cube, it will return null - * for dynamic storage like ii, it will for example exclude the last two minutes for possible data latency - * @return - */ - public Range<Long> getCacheExcludedPeriod() { - Preconditions.checkArgument(lastDataTime != -1, "lastDataTime is not set yet"); - return Ranges.greaterThan(lastDataTime); - } - - private IIProtos.IIRequest prepareRequest() throws IOException { - IIProtos.IIRequest.Builder builder = IIProtos.IIRequest.newBuilder(); - - if (this.tsRange != null) { - byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange); - builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes)); - } - - builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) // - .setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) // - .setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) // - .setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators))); - - IIProtos.IIRequest request = builder.build(); - - return request; - } - - private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable { - Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() { - public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<IIProtos.IIResponse> rpcCallback = new BlockingRpcCallback<>(); - rowsService.getRows(controller, request, rpcCallback); - IIProtos.IIResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - - return response; - } - }); - - return results.values(); - } - - /** - * Internal class to handle iterators for a single region's returned rows - */ - class SingleRegionTupleIterator implements ITupleIterator { - private List<IIProtos.IIResponseInternal.IIRow> rows; - private int index = 0; - - //not thread safe! - private TableRecord tableRecord; - private List<Object> measureValues; - private Tuple tuple; - - public SingleRegionTupleIterator(List<IIProtos.IIResponseInternal.IIRow> rows) { - this.rows = rows; - this.index = 0; - this.tableRecord = tableRecordInfo.createTableRecord(); - this.tuple = new Tuple(tupleInfo); - } - - @Override - public boolean hasNext() { - return index < rows.size(); - } - - @Override - public ITuple next() { - if (!hasNext()) { - throw new IllegalStateException("No more Tuple in the SingleRegionTupleIterator"); - } - - IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index); - byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns()); - this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length); - if (currentRow.hasMeasures()) { - ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer(); - this.measureValues = pushedDownAggregators.deserializeMetricValues(buffer); - } - - index++; - - return tupleConverter.makeTuple(this.tableRecord, this.measureValues, this.tuple); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java deleted file mode 100644 index a1d0e35..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.invertedindex.model.IIRow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable { - - private RegionScanner innerScanner; - private Logger logger = LoggerFactory.getLogger(HbaseServerKVIterator.class); - - public HbaseServerKVIterator(RegionScanner innerScanner) { - this.innerScanner = innerScanner; - } - - @Override - public void close() throws IOException { - IOUtils.closeQuietly(this.innerScanner); - } - - private static class IIRowIterator implements Iterator<IIRow> { - - private final RegionScanner regionScanner; - private final IIRow row = new IIRow(); - List<Cell> results = Lists.newArrayList(); - - private boolean hasMore; - - IIRowIterator(RegionScanner innerScanner) { - this.regionScanner = innerScanner; - try { - hasMore = regionScanner.nextRaw(results); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean hasNext() { - return !results.isEmpty(); - } - - @Override - public IIRow next() { - if (results.size() < 1) { - throw new NoSuchElementException(); - } - for (Cell c : results) { - row.updateWith(c); - } - results.clear(); - try { - if (hasMore) { - hasMore = regionScanner.nextRaw(results); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return row; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - @Override - public Iterator<IIRow> iterator() { - return new IIRowIterator(innerScanner); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java deleted file mode 100644 index ef7de3a..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.SerializationUtils; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.CoprocessorException; -import org.apache.hadoop.hbase.coprocessor.CoprocessorService; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.BytesSerializer; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.CompressionUtils; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.kv.RowKeyColumnIO; -import org.apache.kylin.dict.TrieDictionary; -import org.apache.kylin.dimension.FixedLenDimEnc; -import org.apache.kylin.invertedindex.index.RawTableRecord; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; -import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; -import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.protobuf.HBaseZeroCopyByteString; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - -import it.uniroma3.mat.extendedset.intset.ConciseSet; - -/** - */ -public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, CoprocessorService { - - private static final Logger logger = LoggerFactory.getLogger(IIEndpoint.class); - private static final int MEMORY_LIMIT = 500 * 1024 * 1024; - - private RegionCoprocessorEnvironment env; - private long serviceStartTime; - private int shard; - - public IIEndpoint() { - } - - private Scan prepareScan(IIProtos.IIRequest request, HRegion region) throws IOException { - Scan scan = new Scan(); - - scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES); - scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES); - - if (request.hasTsRange()) { - Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange())); - byte[] regionStartKey = region.getStartKey(); - if (!ArrayUtils.isEmpty(regionStartKey)) { - shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN); - } else { - shard = 0; - } - logger.info("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey) + ", making shard to be :" + shard); - - if (tsRange.hasLowerBound()) { - //differentiate GT and GTE seems not very beneficial - Preconditions.checkArgument(shard != -1, "Shard is -1!"); - long tsStart = tsRange.lowerEndpoint(); - logger.info("ts start is " + tsStart); - - byte[] idealStartKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN]; - BytesUtil.writeUnsigned(shard, idealStartKey, 0, IIKeyValueCodec.SHARD_LEN); - BytesUtil.writeLong(tsStart, idealStartKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN); - logger.info("ideaStartKey is(readable) :" + BytesUtil.toReadableText(idealStartKey)); - Result result = region.getClosestRowBefore(idealStartKey, IIDesc.HBASE_FAMILY_BYTES); - if (result != null) { - byte[] actualStartKey = Arrays.copyOf(result.getRow(), IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN); - scan.setStartRow(actualStartKey); - logger.info("The start key is set to " + BytesUtil.toReadableText(actualStartKey)); - } else { - logger.info("There is no key before ideaStartKey so ignore tsStart"); - } - } - - if (tsRange.hasUpperBound()) { - //differentiate LT and LTE seems not very beneficial - Preconditions.checkArgument(shard != -1, "Shard is -1"); - long tsEnd = tsRange.upperEndpoint(); - logger.info("ts end is " + tsEnd); - - byte[] actualEndKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN]; - BytesUtil.writeUnsigned(shard, actualEndKey, 0, IIKeyValueCodec.SHARD_LEN); - BytesUtil.writeLong(tsEnd + 1, actualEndKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);//notice +1 here - scan.setStopRow(actualEndKey); - logger.info("The stop key is set to " + BytesUtil.toReadableText(actualEndKey)); - } - } - - return scan; - } - - @Override - public void getRows(RpcController controller, IIProtos.IIRequest request, RpcCallback<IIProtos.IIResponse> done) { - - this.serviceStartTime = System.currentTimeMillis(); - - RegionScanner innerScanner = null; - HRegion region = null; - - try { - region = env.getRegion(); - region.startRegionOperation(); - - innerScanner = region.getScanner(prepareScan(request, region)); - - CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getType())); - CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getProjector())); - EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getAggregator())); - CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getFilter())); - - //compression - IIProtos.IIResponseInternal response = getResponse(innerScanner, type, projector, aggregators, filter); - byte[] compressed = CompressionUtils.compress(response.toByteArray()); - IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap(compressed)).build(); - - done.run(compressedR); - } catch (IOException ioe) { - logger.error(ioe.toString()); - ResponseConverter.setControllerException(controller, ioe); - } finally { - IOUtils.closeQuietly(innerScanner); - if (region != null) { - try { - region.closeRegionOperation(); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } - } - - public IIProtos.IIResponseInternal getResponse(RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators, CoprocessorFilter filter) { - - TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest(); - - IIProtos.IIResponseInternal response; - - synchronized (innerScanner) { - IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfoDigest); - //TODO pass projector to codec to skip loading columns - Iterable<Slice> slices = codec.decodeKeyValue(new HbaseServerKVIterator(innerScanner)); - - response = getResponseInternal(slices, tableRecordInfoDigest, filter, type, projector, aggregators); - } - return response; - } - - private IIProtos.IIResponseInternal getResponseInternal(Iterable<Slice> slices, TableRecordInfoDigest recordInfo, CoprocessorFilter filter, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators) { - boolean needAgg = projector.hasGroupby() || !aggregators.isEmpty(); - - //for needAgg use - EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators); - //for no needAgg use - final int byteFormLen = recordInfo.getByteFormLen(); - int totalByteFormLen = 0; - - IIProtos.IIResponseInternal.Builder responseBuilder = IIProtos.IIResponseInternal.newBuilder(); - ClearTextDictionary clearTextDictionary = new ClearTextDictionary(recordInfo, type); - RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary); - - byte[] recordBuffer = new byte[recordInfo.getByteFormLen()]; - byte[] buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE]; - - int iteratedSliceCount = 0; - long latestSliceTs = Long.MIN_VALUE; - for (Slice slice : slices) { - latestSliceTs = slice.getTimestamp(); - iteratedSliceCount++; - - //dictionaries for fact table columns can not be determined while streaming. - //a piece of dict coincide with each Slice, we call it "local dict" - final Dictionary<?>[] localDictionaries = slice.getLocalDictionaries(); - CoprocessorFilter newFilter; - final boolean emptyDictionary = Array.isEmpty(localDictionaries); - if (emptyDictionary) { - newFilter = filter; - } else { - for (Dictionary<?> localDictionary : localDictionaries) { - if (localDictionary instanceof TrieDictionary) { - ((TrieDictionary) localDictionary).enableIdToValueBytesCache(); - } - } - newFilter = CoprocessorFilter.fromFilter(new LocalDictionary(localDictionaries, type, slice.getInfo()), filter.getFilter(), FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_LOCAL_DICT); - } - - ConciseSet result = null; - if (filter != null) { - result = new BitMapFilterEvaluator(new SliceBitMapProvider(slice, type)).evaluate(newFilter.getFilter()); - } - - Iterator<RawTableRecord> iterator = slice.iterateWithBitmap(result); - - TblColRef[] columns = type.columns; - int[] finalColumnLength = new int[columns.length]; - for (int i = 0; i < columns.length; ++i) { - finalColumnLength[i] = rowKeyColumnIO.getColumnLength(columns[i]); - } - - while (iterator.hasNext()) { - final RawTableRecord rawTableRecord = iterator.next(); - decodeWithDictionary(recordBuffer, rawTableRecord, localDictionaries, recordInfo, rowKeyColumnIO, finalColumnLength); - - if (needAgg) { - //if has group by, group them first, and extract entries later - AggrKey aggKey = projector.getAggrKey(recordBuffer); - MeasureAggregator[] bufs = aggCache.getBuffer(aggKey); - aggregators.aggregate(bufs, recordBuffer); - aggCache.checkMemoryUsage(); - } else { - //otherwise directly extract entry and put into response - if (totalByteFormLen >= MEMORY_LIMIT) { - throw new RuntimeException("the query has exceeded the memory limit, please check the query"); - } - IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(recordBuffer)); - responseBuilder.addRows(rowBuilder.build()); - totalByteFormLen += byteFormLen; - } - } - } - - logger.info("Iterated Slices count: " + iteratedSliceCount); - - if (needAgg) { - int offset = 0; - int measureLength = aggregators.getMeasureSerializeLength(); - for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) { - AggrKey aggrKey = entry.getKey(); - IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length())); - if (offset + measureLength > buffer.length) { - buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE]; - offset = 0; - } - int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset); - rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, offset, length)); - offset += length; - responseBuilder.addRows(rowBuilder.build()); - } - } - - responseBuilder.setStats(IIProtos.IIResponseInternal.Stats.newBuilder().setLatestDataTime(latestSliceTs).setServiceStartTime(this.serviceStartTime).setServiceEndTime(System.currentTimeMillis()).setScannedSlices(iteratedSliceCount)); - return responseBuilder.build(); - } - - private void decodeWithDictionary(byte[] recordBuffer, RawTableRecord encodedRecord, Dictionary<?>[] localDictionaries, TableRecordInfoDigest digest, RowKeyColumnIO rowKeyColumnIO, int[] finalColumnLengths) { - final boolean[] isMetric = digest.isMetrics(); - final boolean emptyDictionary = Array.isEmpty(localDictionaries); - for (int i = 0; i < finalColumnLengths.length; i++) { - if (isMetric[i]) { - writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]); - } else { - if (emptyDictionary) { - writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]); - } else { - final Dictionary<?> localDictionary = localDictionaries[i]; - final byte[] valueBytesFromId = localDictionary.getValueBytesFromId(encodedRecord.getValueID(i)); - writeColumnWithoutDictionary(valueBytesFromId, 0, valueBytesFromId.length, recordBuffer, digest.offset(i), finalColumnLengths[i]); - } - } - } - } - - private void writeColumnWithoutDictionary(byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength) { - if (srcLength >= dstLength) { - System.arraycopy(src, srcOffset, dst, dstOffset, dstLength); - } else { - System.arraycopy(src, srcOffset, dst, dstOffset, srcLength); - Arrays.fill(dst, dstOffset + srcLength, dstOffset + dstLength, FixedLenDimEnc.ROWKEY_PLACE_HOLDER_BYTE); - } - } - - @Override - public void start(CoprocessorEnvironment env) throws IOException { - if (env instanceof RegionCoprocessorEnvironment) { - this.env = (RegionCoprocessorEnvironment) env; - } else { - throw new CoprocessorException("Must be loaded on a table region!"); - } - } - - @Override - public void stop(CoprocessorEnvironment env) throws IOException { - } - - @Override - public Service getService() { - return this; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java deleted file mode 100644 index e62f41f..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; - -/** - */ -public class IIResponseAdapter { -}