http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java deleted file mode 100644 index a702a6b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.partition.impl; - -import java.util.List; - -import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.partition.Partitioner; - -/** - * Hash partitioner implementation - */ -public class HashPartitionerImpl implements Partitioner<Object[]> { - - private int numberOfBuckets; - - private Hash[] hashes; - - public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas, - int numberOfBuckets) { - this.numberOfBuckets = numberOfBuckets; - hashes = new Hash[indexes.size()]; - for (int i = 0; i < indexes.size(); i++) { - switch(columnSchemas.get(i).getDataType()) { - case SHORT: - case INT: - case LONG: - hashes[i] = new IntegralHash(indexes.get(i)); - break; - case DOUBLE: - case FLOAT: - case DECIMAL: - hashes[i] = new DecimalHash(indexes.get(i)); - break; - default: - hashes[i] = new StringHash(indexes.get(i)); - } - } - } - - @Override public int getPartition(Object[] objects) { - int hashCode = 0; - for (Hash hash : hashes) { - hashCode += hash.getHash(objects); - } - return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; - } - - private interface Hash { - int getHash(Object[] value); - } - - private static class IntegralHash implements Hash { - - private int index; - - private IntegralHash(int index) { - this.index = index; - } - - public int getHash(Object[] value) { - return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0; - } - } - - private static class DecimalHash implements Hash { - - private int index; - - private DecimalHash(int index) { - this.index = index; - } - - public int getHash(Object[] value) { - return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0; - } - } - - private static class StringHash implements Hash { - - private int index; - - private StringHash(int index) { - this.index = index; - } - - @Override public int getHash(Object[] value) { - return value[index] != null ? value[index].hashCode() : 0; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java index 10f7029..db5f068 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java @@ -21,7 +21,7 @@ package org.apache.carbondata.core.reader; import java.io.IOException; -import org.apache.carbondata.core.update.DeleteDeltaBlockDetails; +import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java index dac2d43..854a23b 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java @@ -19,16 +19,20 @@ package org.apache.carbondata.core.reader; -import java.io.*; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringWriter; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.update.DeleteDeltaBlockDetails; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.fileoperations.AtomicFileOperations; -import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl; import com.google.gson.Gson; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java index a0922ec..15c0cbc 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java @@ -24,14 +24,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.update.DeleteDeltaBlockDetails; -import org.apache.carbondata.core.update.DeleteDeltaBlockletDetails; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; +import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.commons.lang.ArrayUtils; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java index c79c15d..21eb979 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java @@ -23,12 +23,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.common.factory.CarbonCommonFactory; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.carbon.ColumnIdentifier; -import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnIdentifier; +import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.service.PathService; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.ColumnDictionaryChunkMeta; import org.apache.thrift.TBase; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java index 75aecf3..fe515d9 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java @@ -25,12 +25,12 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.carbondata.common.factory.CarbonCommonFactory; import org.apache.carbondata.core.cache.dictionary.ColumnDictionaryChunkIterator; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.carbon.ColumnIdentifier; -import org.apache.carbondata.core.carbon.path.CarbonTablePath; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnIdentifier; +import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.service.PathService; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.ColumnDictionaryChunk; import org.apache.thrift.TBase; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java index 81b7661..0e3a3e0 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java @@ -22,7 +22,7 @@ package org.apache.carbondata.core.reader; import java.io.DataInputStream; import java.io.IOException; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.thrift.TBase; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java index 00ae688..e7da781 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java @@ -21,18 +21,18 @@ package org.apache.carbondata.core.reader.sortindex; import java.io.IOException; import java.util.List; -import org.apache.carbondata.common.factory.CarbonCommonFactory; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.carbon.ColumnIdentifier; -import org.apache.carbondata.core.carbon.path.CarbonTablePath; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk; import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader; import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl; import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.service.PathService; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.ColumnSortInfo; import org.apache.thrift.TBase; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java new file mode 100644 index 0000000..7ea0625 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.collector; + +import java.util.List; + +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; + +/** + * Interface which will be used to aggregate the scan result + */ +public interface ScannedResultCollector { + + /** + * Below method will be used to aggregate the scanned result + * + * @param scannedResult scanned result + * @return how many records was aggregated + */ + List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize); + + /** + * Collects data in columnar format. + * @param scannedResult + * @param columnarBatch + */ + void collectVectorBatch(AbstractScannedResult scannedResult, CarbonColumnarBatch columnarBatch); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java new file mode 100644 index 0000000..9235c97 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.scan.collector.ScannedResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; +import org.apache.carbondata.core.scan.executor.util.QueryUtil; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; + +/** + * It is not a collector it is just a scanned result holder. + */ +public abstract class AbstractScannedResultCollector implements ScannedResultCollector { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractScannedResultCollector.class.getName()); + + /** + * restructuring info + */ + private KeyStructureInfo restructureInfos; + + /** + * table block execution infos + */ + protected BlockExecutionInfo tableBlockExecutionInfos; + + /** + * Measure ordinals + */ + protected int[] measuresOrdinal; + + /** + * to check whether measure exists in current table block or not this to + * handle restructuring scenario + */ + protected boolean[] isMeasureExistsInCurrentBlock; + + /** + * default value of the measures in case of restructuring some measure wont + * be present in the table so in that default value will be used to + * aggregate the data for that measure columns + */ + private Object[] measureDefaultValue; + + /** + * measure datatypes. + */ + protected DataType[] measureDatatypes; + + public AbstractScannedResultCollector(BlockExecutionInfo blockExecutionInfos) { + this.tableBlockExecutionInfos = blockExecutionInfos; + restructureInfos = blockExecutionInfos.getKeyStructureInfo(); + measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals(); + isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists(); + measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues(); + this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes(); + } + + protected void fillMeasureData(Object[] msrValues, int offset, + AbstractScannedResult scannedResult) { + for (short i = 0; i < measuresOrdinal.length; i++) { + // if measure exists is block then pass measure column + // data chunk to the collector + if (isMeasureExistsInCurrentBlock[i]) { + msrValues[i + offset] = getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]), + scannedResult.getCurrenrRowId(), measureDatatypes[i]); + } else { + // if not then get the default value and use that value in aggregation + msrValues[i + offset] = measureDefaultValue[i]; + } + } + } + + private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + switch (dataType) { + case SHORT: + case INT: + case LONG: + return dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index); + case DECIMAL: + return org.apache.spark.sql.types.Decimal + .apply(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index)); + default: + return dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index); + } + } + return null; + } + + /** + * Below method will used to get the result + */ + protected void updateData(List<Object[]> listBasedResult) { + if (tableBlockExecutionInfos.isFixedKeyUpdateRequired()) { + updateKeyWithLatestBlockKeygenerator(listBasedResult); + } + } + + /** + * Below method will be used to update the fixed length key with the + * latest block key generator + * + * @return updated block + */ + private void updateKeyWithLatestBlockKeygenerator(List<Object[]> listBasedResult) { + try { + long[] data = null; + ByteArrayWrapper key = null; + for (int i = 0; i < listBasedResult.size(); i++) { + // get the key + key = (ByteArrayWrapper) listBasedResult.get(i)[0]; + // unpack the key with table block key generator + data = tableBlockExecutionInfos.getBlockKeyGenerator() + .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock()); + // packed the key with latest block key generator + // and generate the masked key for that key + key.setDictionaryKey(QueryUtil + .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data), + restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(), + restructureInfos.getMaskByteRanges().length)); + } + } catch (KeyGenException e) { + LOGGER.error(e); + } + } + + @Override public void collectVectorBatch(AbstractScannedResult scannedResult, + CarbonColumnarBatch columnarBatch) { + throw new UnsupportedOperationException("Works only for batch collectors"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java new file mode 100644 index 0000000..391f5da --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; + +import org.apache.commons.lang3.ArrayUtils; +/** + * It is not a collector it is just a scanned result holder. + */ +public class DictionaryBasedResultCollector extends AbstractScannedResultCollector { + + public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + } + + /** + * This method will add a record both key and value to list object + * it will keep track of how many record is processed, to handle limit scenario + */ + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + + List<Object[]> listBasedResult = new ArrayList<>(batchSize); + boolean isMsrsPresent = measureDatatypes.length > 0; + + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions(); + List<Integer> dictionaryIndexes = new ArrayList<Integer>(); + for (int i = 0; i < queryDimensions.length; i++) { + if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || + queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) { + dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal()); + } + } + int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray( + new Integer[dictionaryIndexes.size()])); + Arrays.sort(primitive); + int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()]; + int index = 0; + for (int i = 0; i < queryDimensions.length; i++) { + if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || + queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) { + actualIndexInSurrogateKey[index++] = Arrays.binarySearch(primitive, + queryDimensions[i].getDimension().getOrdinal()); + } + } + + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures(); + BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = + scannedResult.getDeleteDeltaDataCache(); + Map<Integer, GenericQueryType> comlexDimensionInfoMap = + tableBlockExecutionInfos.getComlexDimensionInfoMap(); + boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions); + boolean[] directDictionaryEncodingArray = + CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions); + boolean[] implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions); + boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions); + int dimSize = queryDimensions.length; + boolean isDimensionsExist = dimSize > 0; + int[] order = new int[dimSize + queryMeasures.length]; + for (int i = 0; i < dimSize; i++) { + order[i] = queryDimensions[i].getQueryOrder(); + } + for (int i = 0; i < queryMeasures.length; i++) { + order[i + dimSize] = queryMeasures[i].getQueryOrder(); + } + // scan the record and add to list + int rowCounter = 0; + int dictionaryColumnIndex = 0; + int noDictionaryColumnIndex = 0; + int complexTypeColumnIndex = 0; + int[] surrogateResult; + String[] noDictionaryKeys; + byte[][] complexTypeKeyArray; + while (scannedResult.hasNext() && rowCounter < batchSize) { + Object[] row = new Object[dimSize + queryMeasures.length]; + if (isDimensionsExist) { + surrogateResult = scannedResult.getDictionaryKeyIntegerArray(); + noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray(); + complexTypeKeyArray = scannedResult.getComplexTypeKeyArray(); + dictionaryColumnIndex = 0; + noDictionaryColumnIndex = 0; + complexTypeColumnIndex = 0; + for (int i = 0; i < dimSize; i++) { + if (!dictionaryEncodingArray[i]) { + if (implictColumnArray[i]) { + if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID + .equals(queryDimensions[i].getDimension().getColName())) { + row[order[i]] = DataTypeUtil.getDataBasedOnDataType( + scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR + + scannedResult.getCurrenrRowId(), DataType.STRING); + } else { + row[order[i]] = DataTypeUtil + .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING); + } + } else { + row[order[i]] = DataTypeUtil + .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++], + queryDimensions[i].getDimension().getDataType()); + } + } else if (directDictionaryEncodingArray[i]) { + DirectDictionaryGenerator directDictionaryGenerator = + DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType()); + if (directDictionaryGenerator != null) { + row[order[i]] = directDictionaryGenerator.getValueFromSurrogate( + surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]); + } + } else if (complexDataTypeArray[i]) { + row[order[i]] = comlexDimensionInfoMap + .get(queryDimensions[i].getDimension().getOrdinal()) + .getDataBasedOnDataTypeFromSurrogates( + ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++])); + } else { + row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]; + } + } + + } else { + scannedResult.incrementCounter(); + } + if (null != deleteDeltaDataCache && deleteDeltaDataCache + .contains(scannedResult.getCurrenrRowId())) { + continue; + } + if (isMsrsPresent) { + Object[] msrValues = new Object[measureDatatypes.length]; + fillMeasureData(msrValues, 0, scannedResult); + for (int i = 0; i < msrValues.length; i++) { + row[order[i + dimSize]] = msrValues[i]; + } + } + listBasedResult.add(row); + rowCounter++; + } + return listBasedResult; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java new file mode 100644 index 0000000..401dffd --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor; + +/** + * It is not a collector it is just a scanned result holder. + */ +public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector { + + private ColumnVectorInfo[] dictionaryInfo; + + private ColumnVectorInfo[] noDictionaryInfo; + + private ColumnVectorInfo[] complexInfo; + + private ColumnVectorInfo[] measureInfo; + + private ColumnVectorInfo[] allColumnInfo; + + public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions(); + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures(); + measureInfo = new ColumnVectorInfo[queryMeasures.length]; + allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length]; + List<ColumnVectorInfo> dictInfoList = new ArrayList<>(); + List<ColumnVectorInfo> noDictInfoList = new ArrayList<>(); + List<ColumnVectorInfo> complexList = new ArrayList<>(); + for (int i = 0; i < queryDimensions.length; i++) { + if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + noDictInfoList.add(columnVectorInfo); + columnVectorInfo.dimension = queryDimensions[i]; + columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + dictInfoList.add(columnVectorInfo); + columnVectorInfo.dimension = queryDimensions[i]; + columnVectorInfo.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType()); + columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } else if (queryDimensions[i].getDimension().isComplex()) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + complexList.add(columnVectorInfo); + columnVectorInfo.dimension = queryDimensions[i]; + columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal(); + columnVectorInfo.genericQueryType = + tableBlockExecutionInfos.getComlexDimensionInfoMap().get(columnVectorInfo.ordinal); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } else { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + dictInfoList.add(columnVectorInfo); + columnVectorInfo.dimension = queryDimensions[i]; + columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } + } + for (int i = 0; i < queryMeasures.length; i++) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + columnVectorInfo.measureVectorFiller = MeasureDataVectorProcessor.MeasureVectorFillerFactory + .getMeasureVectorFiller(queryMeasures[i].getMeasure().getDataType()); + columnVectorInfo.ordinal = queryMeasures[i].getMeasure().getOrdinal(); + columnVectorInfo.measure = queryMeasures[i]; + measureInfo[i] = columnVectorInfo; + allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo; + } + dictionaryInfo = dictInfoList.toArray(new ColumnVectorInfo[dictInfoList.size()]); + noDictionaryInfo = noDictInfoList.toArray(new ColumnVectorInfo[noDictInfoList.size()]); + complexInfo = complexList.toArray(new ColumnVectorInfo[complexList.size()]); + Arrays.sort(dictionaryInfo); + Arrays.sort(noDictionaryInfo); + Arrays.sort(complexInfo); + } + + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + throw new UnsupportedOperationException("collectData is not supported here"); + } + + @Override public void collectVectorBatch(AbstractScannedResult scannedResult, + CarbonColumnarBatch columnarBatch) { + int rowCounter = scannedResult.getRowCounter(); + int availableRows = scannedResult.numberOfOutputRows() - rowCounter; + int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize(); + requiredRows = Math.min(requiredRows, availableRows); + if (requiredRows < 1) { + return; + } + for (int i = 0; i < allColumnInfo.length; i++) { + allColumnInfo[i].size = requiredRows; + allColumnInfo[i].offset = rowCounter; + allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter(); + allColumnInfo[i].vector = columnarBatch.columnVectors[i]; + } + + scannedResult.fillColumnarDictionaryBatch(dictionaryInfo); + scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo); + scannedResult.fillColumnarMeasureBatch(measureInfo, measuresOrdinal); + scannedResult.fillColumnarComplexBatch(complexInfo); + scannedResult.setRowCounter(rowCounter + requiredRows); + columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows); + columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java new file mode 100644 index 0000000..a738402 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; + + +/** + * It is not a collector it is just a scanned result holder. + */ +public class RawBasedResultCollector extends AbstractScannedResultCollector { + + public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + } + + /** + * This method will add a record both key and value to list object + * it will keep track of how many record is processed, to handle limit scenario + */ + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + List<Object[]> listBasedResult = new ArrayList<>(batchSize); + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures(); + ByteArrayWrapper wrapper = null; + BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = + scannedResult.getDeleteDeltaDataCache(); + // scan the record and add to list + int rowCounter = 0; + while (scannedResult.hasNext() && rowCounter < batchSize) { + Object[] row = new Object[1 + queryMeasures.length]; + wrapper = new ByteArrayWrapper(); + wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray()); + wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray()); + wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray()); + wrapper.setImplicitColumnByteArray(scannedResult.getBlockletId() + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + if (null != deleteDeltaDataCache && deleteDeltaDataCache + .contains(scannedResult.getCurrenrRowId())) { + continue; + } + row[0] = wrapper; + fillMeasureData(row, 1, scannedResult); + listBasedResult.add(row); + rowCounter++; + } + updateData(listBasedResult); + return listBasedResult; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java new file mode 100644 index 0000000..3061024 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.complextypes; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; + +import org.apache.spark.sql.catalyst.util.*; +import org.apache.spark.sql.types.*; + +public class ArrayQueryType extends ComplexQueryType implements GenericQueryType { + + private GenericQueryType children; + + public ArrayQueryType(String name, String parentname, int blockIndex) { + super(name, parentname, blockIndex); + } + + @Override public void addChildren(GenericQueryType children) { + if (this.getName().equals(children.getParentname())) { + this.children = children; + } else { + this.children.addChildren(children); + } + } + + @Override public String getName() { + return name; + } + + @Override public void setName(String name) { + this.name = name; + } + + @Override public String getParentname() { + return parentname; + } + + @Override public void setParentname(String parentname) { + this.parentname = parentname; + + } + + public void parseBlocksAndReturnComplexColumnByteArray( + DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber, + DataOutputStream dataOutputStream) throws IOException { + byte[] input = new byte[8]; + copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input); + ByteBuffer byteArray = ByteBuffer.wrap(input); + int dataLength = byteArray.getInt(); + dataOutputStream.writeInt(dataLength); + if (dataLength > 0) { + int columnIndex = byteArray.getInt(); + for (int i = 0; i < dataLength; i++) { + children + .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, columnIndex++, + dataOutputStream); + } + } + } + + @Override public int getColsCount() { + return children.getColsCount() + 1; + } + + @Override public DataType getSchemaType() { + return new ArrayType(null, true); + } + + @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) + throws IOException { + readBlockDataChunk(blockChunkHolder); + children.fillRequiredBlockData(blockChunkHolder); + } + + @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) { + int dataLength = surrogateData.getInt(); + if (dataLength == -1) { + return null; + } + Object[] data = new Object[dataLength]; + for (int i = 0; i < dataLength; i++) { + data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData); + } + return new GenericArrayData(data); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java new file mode 100644 index 0000000..75ed3ff --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.complextypes; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; + +public class ComplexQueryType { + protected GenericQueryType children; + + protected String name; + + protected String parentname; + + protected int blockIndex; + + public ComplexQueryType(String name, String parentname, int blockIndex) { + this.name = name; + this.parentname = parentname; + this.blockIndex = blockIndex; + } + + /** + * Method will copy the block chunk holder data to the passed + * byte[], this method is also used by child + * + * @param rowNumber + * @param input + */ + protected void copyBlockDataChunk(DimensionColumnDataChunk[] dimensionColumnDataChunks, + int rowNumber, byte[] input) { + byte[] data = dimensionColumnDataChunks[blockIndex].getChunkData(rowNumber); + System.arraycopy(data, 0, input, 0, data.length); + } + + /* + * This method will read the block data chunk from the respective block + */ + protected void readBlockDataChunk(BlocksChunkHolder blockChunkHolder) throws IOException { + if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) { + blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock() + .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java new file mode 100644 index 0000000..51e0252 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.complextypes; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.keygenerator.mdkey.Bits; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.util.DataTypeUtil; + +import org.apache.spark.sql.types.BooleanType$; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType$; +import org.apache.spark.sql.types.DoubleType$; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.TimestampType$; + +public class PrimitiveQueryType extends ComplexQueryType implements GenericQueryType { + + private String name; + private String parentname; + + private int keySize; + + private int blockIndex; + + private Dictionary dictionary; + + private org.apache.carbondata.core.metadata.datatype.DataType dataType; + + private boolean isDirectDictionary; + + public PrimitiveQueryType(String name, String parentname, int blockIndex, + org.apache.carbondata.core.metadata.datatype.DataType dataType, int keySize, + Dictionary dictionary, boolean isDirectDictionary) { + super(name, parentname, blockIndex); + this.dataType = dataType; + this.keySize = keySize; + this.dictionary = dictionary; + this.name = name; + this.parentname = parentname; + this.blockIndex = blockIndex; + this.isDirectDictionary = isDirectDictionary; + } + + @Override public void addChildren(GenericQueryType children) { + + } + + @Override public String getName() { + return name; + } + + @Override public void setName(String name) { + this.name = name; + } + + @Override public String getParentname() { + return parentname; + } + + @Override public void setParentname(String parentname) { + this.parentname = parentname; + + } + + @Override public int getColsCount() { + return 1; + } + + @Override public void parseBlocksAndReturnComplexColumnByteArray( + DimensionColumnDataChunk[] dimensionDataChunks, int rowNumber, + DataOutputStream dataOutputStream) throws IOException { + byte[] currentVal = + new byte[dimensionDataChunks[blockIndex].getColumnValueSize()]; + copyBlockDataChunk(dimensionDataChunks, rowNumber, currentVal); + dataOutputStream.write(currentVal); + } + + @Override public DataType getSchemaType() { + switch (dataType) { + case INT: + return IntegerType$.MODULE$; + case DOUBLE: + return DoubleType$.MODULE$; + case LONG: + return LongType$.MODULE$; + case BOOLEAN: + return BooleanType$.MODULE$; + case TIMESTAMP: + return TimestampType$.MODULE$; + case DATE: + return DateType$.MODULE$; + default: + return IntegerType$.MODULE$; + } + } + + @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) + throws IOException { + readBlockDataChunk(blockChunkHolder); + } + + @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) { + byte[] data = new byte[keySize]; + surrogateData.get(data); + Bits bit = new Bits(new int[]{keySize * 8}); + int surrgateValue = (int)bit.getKeyArray(data, 0)[0]; + Object actualData = null; + if (isDirectDictionary) { + DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dataType); + actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue); + } else { + String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue); + actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType); + } + return actualData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java new file mode 100644 index 0000000..c0b6f1b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.complextypes; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; + +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class StructQueryType extends ComplexQueryType implements GenericQueryType { + + private List<GenericQueryType> children = new ArrayList<GenericQueryType>(); + private String name; + private String parentname; + + public StructQueryType(String name, String parentname, int blockIndex) { + super(name, parentname, blockIndex); + this.name = name; + this.parentname = parentname; + } + + @Override public void addChildren(GenericQueryType newChild) { + if (this.getName().equals(newChild.getParentname())) { + this.children.add(newChild); + } else { + for (GenericQueryType child : this.children) { + child.addChildren(newChild); + } + } + + } + + @Override public String getName() { + return name; + } + + @Override public void setName(String name) { + this.name = name; + } + + @Override public String getParentname() { + return parentname; + } + + @Override public void setParentname(String parentname) { + this.parentname = parentname; + + } + + @Override public int getColsCount() { + int colsCount = 1; + for (int i = 0; i < children.size(); i++) { + colsCount += children.get(i).getColsCount(); + } + return colsCount; + } + + @Override public void parseBlocksAndReturnComplexColumnByteArray( + DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber, + DataOutputStream dataOutputStream) throws IOException { + byte[] input = new byte[8]; + copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input); + ByteBuffer byteArray = ByteBuffer.wrap(input); + int childElement = byteArray.getInt(); + dataOutputStream.writeInt(childElement); + if (childElement > 0){ + for (int i = 0; i < childElement; i++) { + children.get(i) + .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber, + dataOutputStream); + } + } + } + + @Override public DataType getSchemaType() { + StructField[] fields = new StructField[children.size()]; + for (int i = 0; i < children.size(); i++) { + fields[i] = new StructField(children.get(i).getName(), null, true, + Metadata.empty()); + } + return new StructType(fields); + } + + @Override public void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder) + throws IOException { + readBlockDataChunk(blockChunkHolder); + + for (int i = 0; i < children.size(); i++) { + children.get(i).fillRequiredBlockData(blockChunkHolder); + } + } + + @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) { + int childLength = surrogateData.getInt(); + Object[] fields = new Object[childLength]; + for (int i = 0; i < childLength; i++) { + fields[i] = children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData); + } + + return new GenericInternalRow(fields); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java new file mode 100644 index 0000000..7b324a2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.executor; + +import java.io.IOException; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.QueryModel; + +/** + * Interface for carbon query executor. + * Will be used to execute the query based on the query model + * and will return the iterator over query result + */ +public interface QueryExecutor<E> { + + /** + * Below method will be used to execute the query based on query model passed from driver + * + * @param queryModel query details + * @return query result iterator + * @throws QueryExecutionException if any failure while executing the query + * @throws IOException if fail to read files + */ + CarbonIterator<E> execute(QueryModel queryModel) + throws QueryExecutionException, IOException; + + /** + * Below method will be used for cleanup + * + * @throws QueryExecutionException + */ + void finish() throws QueryExecutionException; +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java new file mode 100644 index 0000000..293129b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.executor; + +import org.apache.carbondata.core.scan.executor.impl.DetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor; +import org.apache.carbondata.core.scan.model.QueryModel; + +/** + * Factory class to get the query executor from RDD + * This will return the executor based on query type + */ +public class QueryExecutorFactory { + + public static QueryExecutor getQueryExecutor(QueryModel queryModel) { + if (queryModel.isVectorReader()) { + return new VectorDetailQueryExecutor(); + } else { + return new DetailQueryExecutor(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java new file mode 100644 index 0000000..0ac1f05 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.executor.exception; + +import java.util.Locale; + +/** + * Exception class for query execution + * + * @author Administrator + */ +public class QueryExecutionException extends Exception { + + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public QueryExecutionException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public QueryExecutionException(String msg, Throwable t) { + super(msg, t); + this.msg = msg; + } + + /** + * Constructor + * + * @param t + */ + public QueryExecutionException(Throwable t) { + super(t); + } + + /** + * This method is used to get the localized message. + * + * @param locale - A Locale object represents a specific geographical, + * political, or cultural region. + * @return - Localized error message. + */ + public String getLocalizedMessage(Locale locale) { + return ""; + } + + /** + * getLocalizedMessage + */ + @Override public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } + +}