http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java new file mode 100644 index 0000000..0c61947 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.processor.impl; + +import org.carbondata.core.datastorage.store.FileHolder; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.processor.AbstractDataBlockIterator; +import org.carbondata.scan.result.Result; + +/** + * Below class will be used to process the block for detail query + */ +public class DataBlockIteratorImpl extends AbstractDataBlockIterator { + + /** + * DataBlockIteratorImpl Constructor + * + * @param blockExecutionInfo execution information + */ + public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, + FileHolder fileReader, int batchSize) { + super(blockExecutionInfo, fileReader, batchSize); + } + + /** + * It scans the block and returns the result with @batchSize + * + * @return Result of @batchSize + */ + public Result next() { + this.scannerResultAggregator.collectData(scannedResult, batchSize); + Result result = this.scannerResultAggregator.getCollectedResult(); + while (result.size() < batchSize && hasNext()) { + this.scannerResultAggregator.collectData(scannedResult, batchSize-result.size()); + result.merge(this.scannerResultAggregator.getCollectedResult()); + } + return result; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java new file mode 100644 index 0000000..5d00e32 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result; + +import java.math.BigDecimal; +import java.util.Map; + +import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk; +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.executor.infos.KeyStructureInfo; + +/** + * Scanned result class which will store and provide the result on request + */ +public abstract class AbstractScannedResult { + + /** + * current row number + */ + protected int currentRow = -1; + /** + * row mapping indexes + */ + protected int[] rowMapping; + /** + * key size of the fixed length column + */ + private int fixedLengthKeySize; + /** + * total number of rows + */ + private int totalNumberOfRows; + /** + * to keep track of number of rows process + */ + private int rowCounter; + /** + * dimension column data chunk + */ + private DimensionColumnDataChunk[] dataChunks; + /** + * measure column data chunk + */ + private MeasureColumnDataChunk[] measureDataChunks; + /** + * dictionary column block index in file + */ + private int[] dictionaryColumnBlockIndexes; + + /** + * no dictionary column block index in file + */ + private int[] noDictionaryColumnBlockIndexes; + + /** + * column group to is key structure info + * which will be used to get the key from the complete + * column group key + * For example if only one dimension of the column group is selected + * then from complete column group key it will be used to mask the key and + * get the particular column key + */ + private Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo; + + public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) { + this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize(); + this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes(); + this.dictionaryColumnBlockIndexes = blockExecutionInfo.getDictionaryColumnBlockIndex(); + this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo(); + } + + /** + * Below method will be used to set the dimension chunks + * which will be used to create a row + * + * @param dataChunks dimension chunks used in query + */ + public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) { + this.dataChunks = dataChunks; + } + + /** + * Below method will be used to set the measure column chunks + * + * @param measureDataChunks measure data chunks + */ + public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) { + this.measureDataChunks = measureDataChunks; + } + + /** + * Below method will be used to get the chunk based in measure ordinal + * + * @param ordinal measure ordinal + * @return measure column chunk + */ + public MeasureColumnDataChunk getMeasureChunk(int ordinal) { + return measureDataChunks[ordinal]; + } + + /** + * Below method will be used to get the key for all the dictionary dimensions + * which is present in the query + * + * @param rowId row id selected after scanning + * @return return the dictionary key + */ + protected byte[] getDictionaryKeyArray(int rowId) { + byte[] completeKey = new byte[fixedLengthKeySize]; + int offset = 0; + for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { + offset += dataChunks[dictionaryColumnBlockIndexes[i]] + .fillChunkData(completeKey, offset, rowId, + columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + } + rowCounter++; + return completeKey; + } + + /** + * Just increment the counter incase of query only on measures. + */ + public void incrementCounter() { + rowCounter ++; + currentRow ++; + } + + /** + * Below method will be used to get the dimension data based on dimension + * ordinal and index + * + * @param dimOrdinal dimension ordinal present in the query + * @param rowId row index + * @return dimension data based on row id + */ + protected byte[] getDimensionData(int dimOrdinal, int rowId) { + return dataChunks[dimOrdinal].getChunkData(rowId); + } + + /** + * Below method will be used to get the dimension key array + * for all the no dictionary dimension present in the query + * + * @param rowId row number + * @return no dictionary keys for all no dictionary dimension + */ + protected byte[][] getNoDictionaryKeyArray(int rowId) { + byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnBlockIndexes.length][]; + int position = 0; + for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { + noDictionaryColumnsKeys[position++] = + dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId); + } + return noDictionaryColumnsKeys; + } + + /** + * Below method will be used to get the complex type keys array based + * on row id for all the complex type dimension selected in query + * + * @param rowId row number + * @return complex type key array for all the complex dimension selected in query + */ + protected byte[][] getComplexTypeKeyArray(int rowId) { + return new byte[0][]; + } + + /** + * @return return the total number of row after scanning + */ + public int numberOfOutputRows() { + return this.totalNumberOfRows; + } + + /** + * to check whether any more row is present in the result + * + * @return + */ + public boolean hasNext() { + return rowCounter < this.totalNumberOfRows; + } + + /** + * As this class will be a flyweight object so + * for one block all the blocklet scanning will use same result object + * in that case we need to reset the counter to zero so + * for new result it will give the result from zero + */ + public void reset() { + rowCounter = 0; + currentRow = -1; + } + + /** + * @param totalNumberOfRows set total of number rows valid after scanning + */ + public void setNumberOfRows(int totalNumberOfRows) { + this.totalNumberOfRows = totalNumberOfRows; + } + + /** + * After applying filter it will return the bit set with the valid row indexes + * so below method will be used to set the row indexes + * + * @param indexes + */ + public void setIndexes(int[] indexes) { + this.rowMapping = indexes; + } + + /** + * Below method will be used to check whether measure value is null or not + * + * @param ordinal measure ordinal + * @param rowIndex row number to be checked + * @return whether it is null or not + */ + protected boolean isNullMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex); + } + + /** + * Below method will be used to get the measure value of + * long type + * + * @param ordinal measure ordinal + * @param rowIndex row number of the measure value + * @return measure value of long type + */ + protected long getLongMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex); + } + + /** + * Below method will be used to get the measure value of double type + * + * @param ordinal measure ordinal + * @param rowIndex row number + * @return measure value of double type + */ + protected double getDoubleMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder() + .getReadableDoubleValueByIndex(rowIndex); + } + + /** + * Below method will be used to get the measure type of big decimal data type + * + * @param ordinal ordinal of the of the measure + * @param rowIndex row number + * @return measure of big decimal type + */ + protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder() + .getReadableBigDecimalValueByIndex(rowIndex); + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + public abstract int getCurrenrRowId(); + + /** + * @return dictionary key array for all the dictionary dimension + * selected in query + */ + public abstract byte[] getDictionaryKeyArray(); + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + public abstract byte[] getDimensionKey(int dimensionOrdinal); + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + public abstract byte[][] getComplexTypeKeyArray(); + + /** + * Below method will be used to get the no dictionary key + * array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + public abstract byte[][] getNoDictionaryKeyArray(); + + /** + * Below method will be used to to check whether measure value + * is null or for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + public abstract boolean isNullMeasureValue(int ordinal); + + /** + * Below method will be used to get the measure value for measure + * of long data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + public abstract long getLongMeasureValue(int ordinal); + + /** + * Below method will be used to get the value of measure of double + * type + * + * @param ordinal measure ordinal + * @return measure value + */ + public abstract double getDoubleMeasureValue(int ordinal); + + /** + * Below method will be used to get the data of big decimal type + * of a measure + * + * @param ordinal measure ordinal + * @return measure value + */ + public abstract BigDecimal getBigDecimalMeasureValue(int ordinal); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java b/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java new file mode 100644 index 0000000..c13b0f7 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.scan.result; + +/** + * Below class holds the query result of batches. + */ +public class BatchRawResult extends BatchResult { + + /** + * This method will return one row at a time based on the counter given. + * @param counter + * @return + */ + public Object[] getRawRow(int counter) { + return rows[counter]; + } + + /** + * For getting the total size. + * @return + */ + public int getSize() { + return rows.length; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/BatchResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/BatchResult.java b/core/src/main/java/org/carbondata/scan/result/BatchResult.java new file mode 100644 index 0000000..dc14060 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/BatchResult.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.scan.result; + +import java.util.NoSuchElementException; + +import org.carbondata.core.iterator.CarbonIterator; + +/** + * Below class holds the query result + */ +public class BatchResult extends CarbonIterator<Object[]> { + + /** + * list of keys + */ + protected Object[][] rows; + + /** + * counter to check whether all the records are processed or not + */ + protected int counter; + + public BatchResult() { + this.rows = new Object[0][]; + } + + /** + * Below method will be used to get the rows + * + * @return + */ + public Object[][] getRows() { + return rows; + } + + /** + * Below method will be used to get the set the values + * + * @param rows + */ + public void setRows(Object[][] rows) { + this.rows = rows; + } + + + /** + * Returns {@code true} if the iteration has more elements. + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + return counter < rows.length; + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public Object[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Object[] row = rows[counter]; + counter++; + return row; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java b/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java new file mode 100644 index 0000000..f3085ce --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.scan.result; + +import org.carbondata.scan.wrappers.ByteArrayWrapper; + +public class ListBasedResultWrapper { + + private ByteArrayWrapper key; + + private Object[] value; + + /** + * @return the key + */ + public ByteArrayWrapper getKey() { + return key; + } + + /** + * @param key the key to set + */ + public void setKey(ByteArrayWrapper key) { + this.key = key; + } + + /** + * @return the value + */ + public Object[] getValue() { + return value; + } + + /** + * @param value the value to set + */ + public void setValue(Object[] value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/Result.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/Result.java b/core/src/main/java/org/carbondata/scan/result/Result.java new file mode 100644 index 0000000..98466bb --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/Result.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.scan.result; + +import org.carbondata.scan.wrappers.ByteArrayWrapper; + +/** + * Result interface for storing the result + */ +public interface Result<K, V> { + /** + * Below method will be used to + * add the sccaed result + * + * @param result + */ + void addScannedResult(K result); + + /** + * Returns {@code true} if the iteration has more elements. + * + * @return {@code true} if the iteration has more elements + */ + boolean hasNext(); + + /** + * Below method will return the result key + * + * @return key + */ + ByteArrayWrapper getKey(); + + /** + * Below code will return the result value + * + * @return value + */ + V[] getValue(); + + void merge(Result<K, V> otherResult); + + /** + * Below method will be used to get the result + * + * @return + */ + K getResult(); + + /** + * @return size of the result + */ + int size(); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java new file mode 100644 index 0000000..962d9a3 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result.impl; + +import java.math.BigDecimal; + +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.result.AbstractScannedResult; + +/** + * Result provider class in case of filter query + * In case of filter query data will be send + * based on filtered row index + */ +public class FilterQueryScannedResult extends AbstractScannedResult { + + public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) { + super(tableBlockExecutionInfos); + } + + /** + * @return dictionary key array for all the dictionary dimension + * selected in query + */ + @Override public byte[] getDictionaryKeyArray() { + ++currentRow; + return getDictionaryKeyArray(rowMapping[currentRow]); + } + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + @Override public byte[][] getComplexTypeKeyArray() { + return getComplexTypeKeyArray(rowMapping[currentRow]); + } + + /** + * Below method will be used to get the no dictionary key + * array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + @Override public byte[][] getNoDictionaryKeyArray() { + return getNoDictionaryKeyArray(rowMapping[currentRow]); + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + @Override public int getCurrenrRowId() { + return rowMapping[currentRow]; + } + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + @Override public byte[] getDimensionKey(int dimensionOrdinal) { + return getDimensionData(dimensionOrdinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to to check whether measure value + * is null or for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + @Override public boolean isNullMeasureValue(int ordinal) { + return isNullMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to get the measure value for measure + * of long data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + @Override public long getLongMeasureValue(int ordinal) { + return getLongMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to get the value of measure of double + * type + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public double getDoubleMeasureValue(int ordinal) { + return getDoubleMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to get the data of big decimal type + * of a measure + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public BigDecimal getBigDecimalMeasureValue(int ordinal) { + return getBigDecimalMeasureValue(ordinal, rowMapping[currentRow]); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java b/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java new file mode 100644 index 0000000..24ebf5b --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.scan.result.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.scan.result.ListBasedResultWrapper; +import org.carbondata.scan.result.Result; +import org.carbondata.scan.wrappers.ByteArrayWrapper; + +/** + * Below class is a holder over list based result wrapper + */ +public class ListBasedResult implements Result<List<ListBasedResultWrapper>, Object> { + + /** + * current result list + */ + private List<ListBasedResultWrapper> currentRowPointer; + + /** + * all result list , this is required because if we merger all the scanned + * result from all the blocks in one list, that list creation will take more + * time as every time list will create a big array and then it will do copy + * the older element to new array, and creation of big array will also be a + * problem if memory is fragmented then jvm in to do defragmentation to + * create a big space, but if divide the data in multiple list than it avoid + * copy and defragmentation + */ + private List<List<ListBasedResultWrapper>> allRowsResult; + + /** + * counter to check how many result processed + */ + private int totalRecordCounter = -1; + + /** + * number of records + */ + private int totalNumberOfRecords; + + /** + * current counter of the record in list + */ + private int listRecordCounter = -1; + + /** + * current list counter + */ + private int currentListCounter; + + public ListBasedResult() { + currentRowPointer = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + allRowsResult = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + /** + * below method will be used to add the scan result + */ + @Override public void addScannedResult(List<ListBasedResultWrapper> listBasedResult) { + this.currentRowPointer = listBasedResult; + totalNumberOfRecords = listBasedResult.size(); + allRowsResult.add(listBasedResult); + } + + /** + * Method to check more result is present + * or not + */ + @Override public boolean hasNext() { + if (allRowsResult.size() == 0) { + return false; + } + // As we are storing data in list of list, below code is to check whether + // any more result is present + // in the result. + // first it will check list counter is zero if it is zero + // than it will check list counter to check how many list has been processed + // if more list are present and all the list of current list is processed + // than it will take a new list from all row result list + totalRecordCounter++; + listRecordCounter++; + if (listRecordCounter == 0 || (listRecordCounter >= currentRowPointer.size() + && currentListCounter < allRowsResult.size())) { + listRecordCounter = 0; + currentRowPointer = allRowsResult.get(currentListCounter); + currentListCounter++; + } + return totalRecordCounter < totalNumberOfRecords; + } + + /** + * @return key + */ + @Override public ByteArrayWrapper getKey() { + return currentRowPointer.get(listRecordCounter).getKey(); + } + + /** + * @return will return the value + */ + @Override public Object[] getValue() { + return currentRowPointer.get(listRecordCounter).getValue(); + } + + /*** + * below method will be used to merge the + * scanned result + * + * @param otherResult return to be merged + */ + @Override public void merge(Result<List<ListBasedResultWrapper>, Object> otherResult) { + if (otherResult.size() > 0) { + totalNumberOfRecords += otherResult.size(); + this.allRowsResult.add(otherResult.getResult()); + } + } + + /** + * Return the size of the result + */ + @Override public int size() { + return totalNumberOfRecords; + } + + /** + * @return the complete result + */ + @Override public List<ListBasedResultWrapper> getResult() { + return currentRowPointer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java new file mode 100644 index 0000000..9782099 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java @@ -0,0 +1,109 @@ +package org.carbondata.scan.result.impl; + +import java.math.BigDecimal; + +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.result.AbstractScannedResult; + +/** + * Result provide class for non filter query + * In case of no filter query we need to return + * complete data + */ +public class NonFilterQueryScannedResult extends AbstractScannedResult { + + public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) { + super(blockExecutionInfo); + } + + /** + * @return dictionary key array for all the dictionary dimension selected in + * query + */ + @Override public byte[] getDictionaryKeyArray() { + ++currentRow; + return getDictionaryKeyArray(currentRow); + } + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + @Override public byte[][] getComplexTypeKeyArray() { + return getComplexTypeKeyArray(currentRow); + } + + /** + * Below method will be used to get the no dictionary key array for all the + * no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + @Override public byte[][] getNoDictionaryKeyArray() { + return getNoDictionaryKeyArray(currentRow); + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + @Override public int getCurrenrRowId() { + return currentRow; + } + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + @Override public byte[] getDimensionKey(int dimensionOrdinal) { + return getDimensionData(dimensionOrdinal, currentRow); + } + + /** + * Below method will be used to to check whether measure value is null or + * for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + @Override public boolean isNullMeasureValue(int ordinal) { + return isNullMeasureValue(ordinal, currentRow); + } + + /** + * Below method will be used to get the measure value for measure of long + * data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + @Override public long getLongMeasureValue(int ordinal) { + return getLongMeasureValue(ordinal, currentRow); + } + + /** + * Below method will be used to get the value of measure of double type + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public double getDoubleMeasureValue(int ordinal) { + return getDoubleMeasureValue(ordinal, currentRow); + } + + /** + * Below method will be used to get the data of big decimal type of a + * measure + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public BigDecimal getBigDecimalMeasureValue(int ordinal) { + return getBigDecimalMeasureValue(ordinal, currentRow); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java new file mode 100644 index 0000000..2356a9f --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result.iterator; + +import java.util.List; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.datastore.DataRefNode; +import org.carbondata.core.carbon.datastore.DataRefNodeFinder; +import org.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.datastorage.store.FileHolder; +import org.carbondata.core.datastorage.store.impl.FileFactory; +import org.carbondata.core.iterator.CarbonIterator; +import org.carbondata.core.util.CarbonProperties; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.model.QueryModel; +import org.carbondata.scan.processor.AbstractDataBlockIterator; +import org.carbondata.scan.processor.impl.DataBlockIteratorImpl; + +/** + * In case of detail query we cannot keep all the records in memory so for + * executing that query are returning a iterator over block and every time next + * call will come it will execute the block and return the result + */ +public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { + + /** + * LOGGER. + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName()); + + /** + * execution info of the block + */ + protected List<BlockExecutionInfo> blockExecutionInfos; + + /** + * number of cores which can be used + */ + private int batchSize; + + /** + * file reader which will be used to execute the query + */ + protected FileHolder fileReader; + + protected AbstractDataBlockIterator dataBlockIterator; + + protected boolean nextBatch = false; + + public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) { + String batchSizeString = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE); + if (null != batchSizeString) { + try { + batchSize = Integer.parseInt(batchSizeString); + } catch (NumberFormatException ne) { + LOGGER.error("Invalid inmemory records size. Using default value"); + batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT; + } + } else { + batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT; + } + + this.blockExecutionInfos = infos; + this.fileReader = FileFactory.getFileHolder( + FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath())); + intialiseInfos(); + } + + private void intialiseInfos() { + for (BlockExecutionInfo blockInfo : blockExecutionInfos) { + DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize()); + DataRefNode startDataBlock = finder + .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey()); + DataRefNode endDataBlock = finder + .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey()); + long numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1; + blockInfo.setFirstDataBlock(startDataBlock); + blockInfo.setNumberOfBlockToScan(numberOfBlockToScan); + } + } + + @Override public boolean hasNext() { + if ((dataBlockIterator != null && dataBlockIterator.hasNext()) || nextBatch) { + return true; + } else { + dataBlockIterator = getDataBlockIterator(); + while (dataBlockIterator != null) { + if (dataBlockIterator.hasNext()) { + return true; + } + dataBlockIterator = getDataBlockIterator(); + } + return false; + } + } + + private DataBlockIteratorImpl getDataBlockIterator() { + if(blockExecutionInfos.size() > 0) { + BlockExecutionInfo executionInfo = blockExecutionInfos.get(0); + blockExecutionInfos.remove(executionInfo); + return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize); + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java new file mode 100644 index 0000000..5cc4f1e --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.scan.result.iterator; + +import org.carbondata.core.iterator.CarbonIterator; +import org.carbondata.scan.result.BatchResult; + +/** + * Iterator over row result + */ +public class ChunkRowIterator extends CarbonIterator<Object[]> { + + /** + * iterator over chunk result + */ + private CarbonIterator<BatchResult> iterator; + + /** + * currect chunk + */ + private BatchResult currentchunk; + + public ChunkRowIterator(CarbonIterator<BatchResult> iterator) { + this.iterator = iterator; + if (iterator.hasNext()) { + currentchunk = iterator.next(); + } + } + + /** + * Returns {@code true} if the iteration has more elements. (In other words, + * returns {@code true} if {@link #next} would return an element rather than + * throwing an exception.) + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + if (null != currentchunk) { + if ((currentchunk.hasNext())) { + return true; + } else if (!currentchunk.hasNext()) { + while (iterator.hasNext()) { + currentchunk = iterator.next(); + if (currentchunk != null && currentchunk.hasNext()) { + return true; + } + } + } + } + return false; + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public Object[] next() { + return currentchunk.next(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java new file mode 100644 index 0000000..4eb50ac --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result.iterator; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.carbondata.scan.executor.exception.QueryExecutionException; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.model.QueryModel; +import org.carbondata.scan.result.BatchResult; +import org.carbondata.scan.result.ListBasedResultWrapper; +import org.carbondata.scan.result.preparator.QueryResultPreparator; + +/** + * In case of detail query we cannot keep all the records in memory so for + * executing that query are returning a iterator over block and every time next + * call will come it will execute the block and return the result + */ +public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator { + + /** + * to prepare the result + */ + private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator; + + private ExecutorService execService = Executors.newFixedThreadPool(1); + + private Future<BatchResult> future; + + public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + QueryResultPreparator queryResultPreparator) { + super(infos, queryModel); + this.queryResultPreparator = queryResultPreparator; + } + + @Override public BatchResult next() { + BatchResult result; + try { + if (future == null) { + future = execute(); + } + result = future.get(); + nextBatch = false; + if (hasNext()) { + nextBatch = true; + future = execute(); + } else { + fileReader.finish(); + } + } catch (Exception ex) { + fileReader.finish(); + throw new RuntimeException(ex.getCause().getMessage()); + } + return result; + } + + private Future<BatchResult> execute() { + return execService.submit(new Callable<BatchResult>() { + @Override public BatchResult call() throws QueryExecutionException { + return queryResultPreparator.prepareQueryResult(dataBlockIterator.next()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java new file mode 100644 index 0000000..8c028b2 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result.iterator; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.datastore.block.SegmentProperties; +import org.carbondata.core.iterator.CarbonIterator; +import org.carbondata.core.keygenerator.KeyGenException; +import org.carbondata.scan.result.BatchRawResult; +import org.carbondata.scan.wrappers.ByteArrayWrapper; + +/** + * This is a wrapper iterator over the detail raw query iterator. + * This iterator will handle the processing of the raw rows. + * This will handle the batch results and will iterate on the batches and give single row. + */ +public class RawResultIterator extends CarbonIterator<Object[]> { + + private final SegmentProperties sourceSegProperties; + + private final SegmentProperties destinationSegProperties; + /** + * Iterator of the Batch raw result. + */ + private CarbonIterator<BatchRawResult> detailRawQueryResultIterator; + + /** + * Counter to maintain the row counter. + */ + private int counter = 0; + + private Object[] currentConveretedRawRow = null; + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RawResultIterator.class.getName()); + + /** + * batch of the result. + */ + private BatchRawResult batch; + + public RawResultIterator(CarbonIterator<BatchRawResult> detailRawQueryResultIterator, + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + this.detailRawQueryResultIterator = detailRawQueryResultIterator; + this.sourceSegProperties = sourceSegProperties; + this.destinationSegProperties = destinationSegProperties; + } + + @Override public boolean hasNext() { + + if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { + if (detailRawQueryResultIterator.hasNext()) { + batch = detailRawQueryResultIterator.next(); + counter = 0; // batch changed so reset the counter. + } else { + return false; + } + } + + if (!checkIfBatchIsProcessedCompletely(batch)) { + return true; + } else { + return false; + } + } + + @Override public Object[] next() { + if (null == batch) { // for 1st time + batch = detailRawQueryResultIterator.next(); + } + if (!checkIfBatchIsProcessedCompletely(batch)) { + try { + if(null != currentConveretedRawRow){ + counter++; + Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; + currentConveretedRawRow = null; + return currentConveretedRawRowTemp; + } + return convertRow(batch.getRawRow(counter++)); + } catch (KeyGenException e) { + LOGGER.error(e.getMessage()); + return null; + } + } else { // completed one batch. + batch = detailRawQueryResultIterator.next(); + counter = 0; + } + try { + if(null != currentConveretedRawRow){ + counter++; + Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; + currentConveretedRawRow = null; + return currentConveretedRawRowTemp; + } + + return convertRow(batch.getRawRow(counter++)); + } catch (KeyGenException e) { + LOGGER.error(e.getMessage()); + return null; + } + + } + + /** + * for fetching the row with out incrementing counter. + * @return + */ + public Object[] fetchConverted() throws KeyGenException { + if(null != currentConveretedRawRow){ + return currentConveretedRawRow; + } + if(hasNext()) + { + Object[] rawRow = batch.getRawRow(counter); + currentConveretedRawRow = convertRow(rawRow);; + return currentConveretedRawRow; + } + else + { + return null; + } + } + + private Object[] convertRow(Object[] rawRow) throws KeyGenException { + byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey(); + long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims); + byte[] covertedBytes = + destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray); + ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes); + return rawRow; + } + + /** + * To check if the batch is processed completely + * @param batch + * @return + */ + private boolean checkIfBatchIsProcessedCompletely(BatchRawResult batch){ + if(counter < batch.getSize()) + { + return false; + } + else{ + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java b/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java new file mode 100644 index 0000000..7ef5b6d --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java @@ -0,0 +1,10 @@ +package org.carbondata.scan.result.preparator; + +import org.carbondata.scan.result.BatchResult; +import org.carbondata.scan.result.Result; + +public interface QueryResultPreparator<K, V> { + + public BatchResult prepareQueryResult(Result<K, V> scannedResult); + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java b/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java new file mode 100644 index 0000000..a42dc67 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java @@ -0,0 +1,87 @@ +package org.carbondata.scan.result.preparator.impl; + +import java.util.List; + +import org.carbondata.core.carbon.metadata.encoder.Encoding; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.carbondata.core.util.CarbonUtil; +import org.carbondata.scan.executor.impl.QueryExecutorProperties; +import org.carbondata.scan.model.QueryDimension; +import org.carbondata.scan.model.QueryModel; +import org.carbondata.scan.result.BatchResult; +import org.carbondata.scan.result.preparator.QueryResultPreparator; +import org.carbondata.scan.util.DataTypeUtil; + +public abstract class AbstractQueryResultPreparator<K, V> implements QueryResultPreparator<K, V> { + + /** + * query properties + */ + protected QueryExecutorProperties queryExecuterProperties; + + /** + * query model + */ + protected QueryModel queryModel; + + public AbstractQueryResultPreparator(QueryExecutorProperties executerProperties, + QueryModel queryModel) { + this.queryExecuterProperties = executerProperties; + this.queryModel = queryModel; + } + + protected void fillDimensionData(Object[][] convertedResult, List<QueryDimension> queryDimensions, + int dimensionCount, Object[] row, int rowIndex) { + QueryDimension queryDimension; + for (int i = 0; i < dimensionCount; i++) { + queryDimension = queryDimensions.get(i); + if (!CarbonUtil + .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) { + row[queryDimension.getQueryOrder()] = convertedResult[i][rowIndex]; + } else if (CarbonUtil + .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType()); + row[queryDimension.getQueryOrder()] = directDictionaryGenerator + .getValueFromSurrogate((Integer) convertedResult[i][rowIndex]); + } else { + if (queryExecuterProperties.sortDimIndexes[i] == 1) { + row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType( + queryExecuterProperties.columnToDictionayMapping + .get(queryDimension.getDimension().getColumnId()) + .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][rowIndex]), + queryDimension.getDimension().getDataType()); + } else { + row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType( + queryExecuterProperties.columnToDictionayMapping + .get(queryDimension.getDimension().getColumnId()) + .getDictionaryValueForKey((Integer) convertedResult[i][rowIndex]), + queryDimension.getDimension().getDataType()); + } + } + } + } + + protected Object[][] encodeToRows(Object[][] data) { + if (data.length == 0) { + return data; + } + Object[][] rData = new Object[data[0].length][data.length]; + int len = data.length; + for (int i = 0; i < rData.length; i++) { + for (int j = 0; j < len; j++) { + rData[i][j] = data[j][i]; + } + } + return rData; + } + + protected BatchResult getEmptyChunkResult(int size) { + Object[][] row = new Object[size][1]; + BatchResult chunkResult = new BatchResult(); + chunkResult.setRows(row); + return chunkResult; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java new file mode 100644 index 0000000..17735bc --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result.preparator.impl; + +import java.nio.charset.Charset; +import java.util.List; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.metadata.encoder.Encoding; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.util.CarbonUtil; +import org.carbondata.scan.executor.impl.QueryExecutorProperties; +import org.carbondata.scan.model.QueryDimension; +import org.carbondata.scan.model.QueryMeasure; +import org.carbondata.scan.model.QueryModel; +import org.carbondata.scan.result.BatchResult; +import org.carbondata.scan.result.ListBasedResultWrapper; +import org.carbondata.scan.result.Result; +import org.carbondata.scan.util.DataTypeUtil; +import org.carbondata.scan.wrappers.ByteArrayWrapper; + +/** + * Below class will be used to get the result by converting to actual data + * Actual data conversion can be converting the surrogate key to actual data + * + * @TODO there are many things in class which is very confusing, need to check + * why it was handled like that and how we can handle that in a better + * way.Need to revisit this class. IF aggregation is push down to spark + * layer and if we can process the data in byte array format then this + * class wont be useful so in future we can delete this class. + * @TODO need to expose one interface which will return the result based on required type + * for example its implementation case return converted result or directly result with out + * converting to actual value + */ +public class DetailQueryResultPreparatorImpl + extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DetailQueryResultPreparatorImpl.class.getName()); + + public DetailQueryResultPreparatorImpl(QueryExecutorProperties executerProperties, + QueryModel queryModel) { + super(executerProperties, queryModel); + } + + @Override public BatchResult prepareQueryResult( + Result<List<ListBasedResultWrapper>, Object> scannedResult) { + if ((null == scannedResult || scannedResult.size() < 1)) { + return new BatchResult(); + } + List<QueryDimension> queryDimension = queryModel.getQueryDimension(); + int dimensionCount = queryDimension.size(); + int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureDataTypes.length; + Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn]; + if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0 + && scannedResult.size() > 0) { + return getEmptyChunkResult(scannedResult.size()); + } + int currentRow = 0; + long[] surrogateResult = null; + int noDictionaryColumnIndex = 0; + ByteArrayWrapper key = null; + Object[] value = null; + while (scannedResult.hasNext()) { + key = scannedResult.getKey(); + value = scannedResult.getValue(); + if (key != null) { + surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator() + .getKeyArray(key.getDictionaryKey(), + queryExecuterProperties.keyStructureInfo.getMaskedBytes()); + for (int i = 0; i < dimensionCount; i++) { + if (!CarbonUtil.hasEncoding(queryDimension.get(i).getDimension().getEncoder(), + Encoding.DICTIONARY)) { + resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType( + new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++), + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), + queryDimension.get(i).getDimension().getDataType()); + } else { + resultData[currentRow][i] = + (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()]; + } + } + } + if (value != null) { + System.arraycopy(value, 0, resultData[currentRow], dimensionCount, + queryExecuterProperties.measureDataTypes.length); + } + currentRow++; + noDictionaryColumnIndex = 0; + } + if (resultData.length > 0) { + resultData = encodeToRows(resultData); + } + return getResult(queryModel, resultData); + } + + private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) { + + int rowSize = convertedResult[0].length; + Object[][] rows = new Object[rowSize][]; + List<QueryDimension> queryDimensions = queryModel.getQueryDimension(); + int dimensionCount = queryDimensions.size(); + int msrCount = queryExecuterProperties.measureDataTypes.length; + Object[] row; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + row = new Object[dimensionCount + msrCount]; + fillDimensionData(convertedResult, queryDimensions, dimensionCount, row, rowIndex); + + QueryMeasure msr; + for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) { + msr = queryModel.getQueryMeasures().get(i); + row[msr.getQueryOrder()] = convertedResult[dimensionCount + i][rowIndex]; + } + rows[rowIndex] = row; + } + LOGGER.info( + "###########################################------ Total Number of records" + rowSize); + BatchResult chunkResult = new BatchResult(); + chunkResult.setRows(rows); + return chunkResult; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java new file mode 100644 index 0000000..75d78a2 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java @@ -0,0 +1,127 @@ +package org.carbondata.scan.result.preparator.impl; + +import java.util.List; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.metadata.encoder.Encoding; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.carbondata.scan.executor.impl.QueryExecutorProperties; +import org.carbondata.scan.model.QueryDimension; +import org.carbondata.scan.model.QueryMeasure; +import org.carbondata.scan.model.QueryModel; +import org.carbondata.scan.model.QuerySchemaInfo; +import org.carbondata.scan.result.BatchRawResult; +import org.carbondata.scan.result.BatchResult; +import org.carbondata.scan.result.ListBasedResultWrapper; +import org.carbondata.scan.result.Result; +import org.carbondata.scan.util.DataTypeUtil; +import org.carbondata.scan.wrappers.ByteArrayWrapper; + +/** + * It does not decode the dictionary. + */ +public class RawQueryResultPreparatorImpl + extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(RawQueryResultPreparatorImpl.class.getName()); + + private QuerySchemaInfo querySchemaInfo; + + public RawQueryResultPreparatorImpl(QueryExecutorProperties executerProperties, + QueryModel queryModel) { + super(executerProperties, queryModel); + querySchemaInfo = new QuerySchemaInfo(); + querySchemaInfo.setKeyGenerator(queryExecuterProperties.keyStructureInfo.getKeyGenerator()); + querySchemaInfo.setMaskedByteIndexes(queryExecuterProperties.keyStructureInfo.getMaskedBytes()); + querySchemaInfo.setQueryDimensions(queryModel.getQueryDimension() + .toArray(new QueryDimension[queryModel.getQueryDimension().size()])); + querySchemaInfo.setQueryMeasures(queryModel.getQueryMeasures() + .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()])); + int msrSize = queryExecuterProperties.measureDataTypes.length; + int dimensionCount = queryModel.getQueryDimension().size(); + int[] queryOrder = new int[dimensionCount + msrSize]; + int[] queryReverseOrder = new int[dimensionCount + msrSize]; + for (int i = 0; i < dimensionCount; i++) { + queryOrder[queryModel.getQueryDimension().get(i).getQueryOrder()] = i; + queryReverseOrder[i] = queryModel.getQueryDimension().get(i).getQueryOrder(); + } + for (int i = 0; i < msrSize; i++) { + queryOrder[queryModel.getQueryMeasures().get(i).getQueryOrder()] = i + dimensionCount; + queryReverseOrder[i + dimensionCount] = queryModel.getQueryMeasures().get(i).getQueryOrder(); + } + querySchemaInfo.setQueryOrder(queryOrder); + querySchemaInfo.setQueryReverseOrder(queryReverseOrder); + } + + @Override public BatchResult prepareQueryResult( + Result<List<ListBasedResultWrapper>, Object> scannedResult) { + if ((null == scannedResult || scannedResult.size() < 1)) { + return new BatchRawResult(); + } + QueryDimension[] queryDimensions = querySchemaInfo.getQueryDimensions(); + int msrSize = queryExecuterProperties.measureDataTypes.length; + int dimSize = queryDimensions.length; + int[] order = querySchemaInfo.getQueryReverseOrder(); + Object[][] resultData = new Object[scannedResult.size()][]; + Object[] value; + Object[] row; + int counter = 0; + if (queryModel.isRawBytesDetailQuery()) { + while (scannedResult.hasNext()) { + value = scannedResult.getValue(); + row = new Object[msrSize + 1]; + row[0] = scannedResult.getKey(); + if (value != null) { + assert (value.length == msrSize); + System.arraycopy(value, 0, row, 1, msrSize); + } + resultData[counter] = row; + counter++; + } + } else { + while (scannedResult.hasNext()) { + value = scannedResult.getValue(); + row = new Object[msrSize + dimSize]; + ByteArrayWrapper key = scannedResult.getKey(); + if (key != null) { + long[] surrogateResult = querySchemaInfo.getKeyGenerator() + .getKeyArray(key.getDictionaryKey(), querySchemaInfo.getMaskedByteIndexes()); + int noDictionaryColumnIndex = 0; + for (int i = 0; i < dimSize; i++) { + if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { + row[order[i]] = DataTypeUtil.getDataBasedOnDataType( + new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++)), + queryDimensions[i].getDimension().getDataType()); + } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator directDictionaryGenerator = + DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator( + queryDimensions[i].getDimension().getDataType()); + if (directDictionaryGenerator != null) { + row[order[i]] = directDictionaryGenerator.getValueFromSurrogate( + (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()]); + } + } else { + row[order[i]] = + (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()]; + } + } + } + for (int i = 0; i < msrSize; i++) { + row[order[i + queryDimensions.length]] = value[i]; + } + resultData[counter] = row; + counter++; + } + } + + LOGGER.info("###########################---- Total Number of records" + scannedResult.size()); + BatchRawResult result = new BatchRawResult(); + result.setRows(resultData); + return result; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java new file mode 100644 index 0000000..4d7f5d3 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.scanner; + +import org.carbondata.scan.executor.exception.QueryExecutionException; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.processor.BlocksChunkHolder; +import org.carbondata.scan.result.AbstractScannedResult; + +/** + * Blocklet scanner class to process the block + */ +public abstract class AbstractBlockletScanner implements BlockletScanner { + + /** + * scanner result + */ + protected AbstractScannedResult scannedResult; + + /** + * block execution info + */ + protected BlockExecutionInfo blockExecutionInfo; + + public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { + this.blockExecutionInfo = tableBlockExecutionInfos; + } + + @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + throws QueryExecutionException { + fillKeyValue(blocksChunkHolder); + return scannedResult; + } + + protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) { + scannedResult.reset(); + scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock() + .getMeasureChunks(blocksChunkHolder.getFileReader(), + blockExecutionInfo.getAllSelectedMeasureBlocksIndexes())); + scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize()); + + scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock() + .getDimensionChunks(blocksChunkHolder.getFileReader(), + blockExecutionInfo.getAllSelectedDimensionBlocksIndexes())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java b/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java new file mode 100644 index 0000000..f1a0646 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.scanner; + +import org.carbondata.scan.executor.exception.QueryExecutionException; +import org.carbondata.scan.processor.BlocksChunkHolder; +import org.carbondata.scan.result.AbstractScannedResult; + +/** + * Interface for processing the block + * Processing can be filter based processing or non filter based processing + */ +public interface BlockletScanner { + + /** + * Below method will used to process the block data and get the scanned result + * + * @param blocksChunkHolder block chunk which holds the block data + * @return scannerResult + * result after processing + * @throws QueryExecutionException + */ + AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + throws QueryExecutionException; +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java new file mode 100644 index 0000000..830146d --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.scanner.impl; + +import java.util.BitSet; + +import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk; +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.datastorage.store.FileHolder; +import org.carbondata.core.util.CarbonProperties; +import org.carbondata.scan.executor.exception.QueryExecutionException; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.expression.exception.FilterUnsupportedException; +import org.carbondata.scan.filter.executer.FilterExecuter; +import org.carbondata.scan.processor.BlocksChunkHolder; +import org.carbondata.scan.result.AbstractScannedResult; +import org.carbondata.scan.result.impl.FilterQueryScannedResult; +import org.carbondata.scan.scanner.AbstractBlockletScanner; + +/** + * Below class will be used for filter query processing + * this class will be first apply the filter then it will read the block if + * required and return the scanned result + */ +public class FilterScanner extends AbstractBlockletScanner { + + /** + * filter tree + */ + private FilterExecuter filterExecuter; + + /** + * this will be used to apply min max + * this will be useful for dimension column which is on the right side + * as node finder will always give tentative blocks, if column data stored individually + * and data is in sorted order then we can check whether filter is in the range of min max or not + * if it present then only we can apply filter on complete data. + * this will be very useful in case of sparse data when rows are + * repeating. + */ + private boolean isMinMaxEnabled; + + public FilterScanner(BlockExecutionInfo blockExecutionInfo) { + super(blockExecutionInfo); + scannedResult = new FilterQueryScannedResult(blockExecutionInfo); + // to check whether min max is enabled or not + String minMaxEnableValue = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED, + CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE); + if (null != minMaxEnableValue) { + isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue); + } + // get the filter tree + this.filterExecuter = blockExecutionInfo.getFilterExecuterTree(); + } + + /** + * Below method will be used to process the block + * + * @param blocksChunkHolder block chunk holder which holds the data + * @throws QueryExecutionException + * @throws FilterUnsupportedException + */ + @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + throws QueryExecutionException { + try { + fillScannedResult(blocksChunkHolder); + } catch (FilterUnsupportedException e) { + throw new QueryExecutionException(e.getMessage()); + } + return scannedResult; + } + + /** + * This method will process the data in below order + * 1. first apply min max on the filter tree and check whether any of the filter + * is fall on the range of min max, if not then return empty result + * 2. If filter falls on min max range then apply filter on actual + * data and get the filtered row index + * 3. if row index is empty then return the empty result + * 4. if row indexes is not empty then read only those blocks(measure or dimension) + * which was present in the query but not present in the filter, as while applying filter + * some of the blocks where already read and present in chunk holder so not need to + * read those blocks again, this is to avoid reading of same blocks which was already read + * 5. Set the blocks and filter indexes to result + * + * @param blocksChunkHolder + * @throws FilterUnsupportedException + */ + private void fillScannedResult(BlocksChunkHolder blocksChunkHolder) + throws FilterUnsupportedException { + + scannedResult.reset(); + // apply min max + if (isMinMaxEnabled) { + BitSet bitSet = this.filterExecuter + .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(), + blocksChunkHolder.getDataBlock().getColumnsMinValue()); + if (bitSet.isEmpty()) { + scannedResult.setNumberOfRows(0); + scannedResult.setIndexes(new int[0]); + return; + } + } + // apply filter on actual data + BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder); + // if indexes is empty then return with empty result + if (bitSet.isEmpty()) { + scannedResult.setNumberOfRows(0); + scannedResult.setIndexes(new int[0]); + return; + } + // get the row indexes from bot set + int[] indexes = new int[bitSet.cardinality()]; + int index = 0; + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + indexes[index++] = i; + } + + FileHolder fileReader = blocksChunkHolder.getFileReader(); + int[] allSelectedDimensionBlocksIndexes = + blockExecutionInfo.getAllSelectedDimensionBlocksIndexes(); + DimensionColumnDataChunk[] dimensionColumnDataChunk = + new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()]; + // read dimension chunk blocks from file which is not present + for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) { + if (null == blocksChunkHolder.getDimensionDataChunk()[allSelectedDimensionBlocksIndexes[i]]) { + dimensionColumnDataChunk[allSelectedDimensionBlocksIndexes[i]] = + blocksChunkHolder.getDataBlock() + .getDimensionChunk(fileReader, allSelectedDimensionBlocksIndexes[i]); + } else { + dimensionColumnDataChunk[allSelectedDimensionBlocksIndexes[i]] = + blocksChunkHolder.getDimensionDataChunk()[allSelectedDimensionBlocksIndexes[i]]; + } + } + MeasureColumnDataChunk[] measureColumnDataChunk = + new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()]; + int[] allSelectedMeasureBlocksIndexes = blockExecutionInfo.getAllSelectedMeasureBlocksIndexes(); + + // read the measure chunk blocks which is not present + for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) { + + if (null == blocksChunkHolder.getMeasureDataChunk()[allSelectedMeasureBlocksIndexes[i]]) { + measureColumnDataChunk[allSelectedMeasureBlocksIndexes[i]] = + blocksChunkHolder.getDataBlock() + .getMeasureChunk(fileReader, allSelectedMeasureBlocksIndexes[i]); + } else { + measureColumnDataChunk[allSelectedMeasureBlocksIndexes[i]] = + blocksChunkHolder.getMeasureDataChunk()[allSelectedMeasureBlocksIndexes[i]]; + } + } + scannedResult.setDimensionChunks(dimensionColumnDataChunk); + scannedResult.setIndexes(indexes); + scannedResult.setMeasureChunks(measureColumnDataChunk); + scannedResult.setNumberOfRows(indexes.length); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java new file mode 100644 index 0000000..b582a95 --- /dev/null +++ b/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.scanner.impl; + +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.result.impl.NonFilterQueryScannedResult; +import org.carbondata.scan.scanner.AbstractBlockletScanner; + +/** + * Non filter processor which will be used for non filter query + * In case of non filter query we just need to read all the blocks requested in the + * query and pass it to scanned result + */ +public class NonFilterScanner extends AbstractBlockletScanner { + + public NonFilterScanner(BlockExecutionInfo blockExecutionInfo) { + super(blockExecutionInfo); + // as its a non filter query creating a non filter query scanned result object + scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo); + } +}