http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java new file mode 100644 index 0000000..7875f92 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.BatchResult; + +/** + * In case of detail query we cannot keep all the records in memory so for + * executing that query are returning a iterator over block and every time next + * call will come it will execute the block and return the result + */ +public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> { + + private final Object lock = new Object(); + private Future<BatchResult> future; + + public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + super(infos, queryModel, execService); + } + + @Override public BatchResult next() { + BatchResult result; + long startTime = System.currentTimeMillis(); + try { + if (future == null) { + future = execute(); + } + result = future.get(); + nextBatch = false; + if (hasNext()) { + nextBatch = true; + future = execute(); + } else { + fileReader.finish(); + } + totalScanTime += System.currentTimeMillis() - startTime; + } catch (Exception ex) { + try { + fileReader.finish(); + } finally { + throw new RuntimeException(ex); + } + } + return result; + } + + private Future<BatchResult> execute() { + return execService.submit(new Callable<BatchResult>() { + @Override public BatchResult call() { + BatchResult batchResult = new BatchResult(); + synchronized (lock) { + updateDataBlockIterator(); + if (dataBlockIterator != null) { + batchResult.setRows(dataBlockIterator.next()); + } + } + return batchResult; + } + }); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java new file mode 100644 index 0000000..2f6a8b7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; + +/** + * This is a wrapper iterator over the detail raw query iterator. + * This iterator will handle the processing of the raw rows. + * This will handle the batch results and will iterate on the batches and give single row. + */ +public class RawResultIterator extends CarbonIterator<Object[]> { + + private final SegmentProperties sourceSegProperties; + + private final SegmentProperties destinationSegProperties; + /** + * Iterator of the Batch raw result. + */ + private CarbonIterator<BatchResult> detailRawQueryResultIterator; + + /** + * Counter to maintain the row counter. + */ + private int counter = 0; + + private Object[] currentConveretedRawRow = null; + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RawResultIterator.class.getName()); + + /** + * batch of the result. + */ + private BatchResult batch; + + public RawResultIterator(CarbonIterator<BatchResult> detailRawQueryResultIterator, + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + this.detailRawQueryResultIterator = detailRawQueryResultIterator; + this.sourceSegProperties = sourceSegProperties; + this.destinationSegProperties = destinationSegProperties; + } + + @Override public boolean hasNext() { + + if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { + if (detailRawQueryResultIterator.hasNext()) { + batch = null; + batch = detailRawQueryResultIterator.next(); + counter = 0; // batch changed so reset the counter. + } else { + return false; + } + } + + if (!checkIfBatchIsProcessedCompletely(batch)) { + return true; + } else { + return false; + } + } + + @Override public Object[] next() { + if (null == batch) { // for 1st time + batch = detailRawQueryResultIterator.next(); + } + if (!checkIfBatchIsProcessedCompletely(batch)) { + try { + if(null != currentConveretedRawRow){ + counter++; + Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; + currentConveretedRawRow = null; + return currentConveretedRawRowTemp; + } + return convertRow(batch.getRawRow(counter++)); + } catch (KeyGenException e) { + LOGGER.error(e.getMessage()); + return null; + } + } else { // completed one batch. + batch = null; + batch = detailRawQueryResultIterator.next(); + counter = 0; + } + try { + if(null != currentConveretedRawRow){ + counter++; + Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; + currentConveretedRawRow = null; + return currentConveretedRawRowTemp; + } + + return convertRow(batch.getRawRow(counter++)); + } catch (KeyGenException e) { + LOGGER.error(e.getMessage()); + return null; + } + + } + + /** + * for fetching the row with out incrementing counter. + * @return + */ + public Object[] fetchConverted() throws KeyGenException { + if(null != currentConveretedRawRow){ + return currentConveretedRawRow; + } + if(hasNext()) + { + Object[] rawRow = batch.getRawRow(counter); + currentConveretedRawRow = convertRow(rawRow); + return currentConveretedRawRow; + } + else + { + return null; + } + } + + private Object[] convertRow(Object[] rawRow) throws KeyGenException { + byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey(); + long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims); + byte[] covertedBytes = + destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray); + ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes); + return rawRow; + } + + /** + * To check if the batch is processed completely + * @param batch + * @return + */ + private boolean checkIfBatchIsProcessedCompletely(BatchResult batch){ + if(counter < batch.getSize()) + { + return false; + } + else{ + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java new file mode 100644 index 0000000..b412655 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.result.iterator; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; + +/** + * Iterator over row result + */ +public class VectorChunkRowIterator extends CarbonIterator<Object[]> { + + /** + * iterator over chunk result + */ + private AbstractDetailQueryResultIterator iterator; + + /** + * currect chunk + */ + private CarbonColumnarBatch columnarBatch; + + private int batchSize; + + private int currentIndex; + + public VectorChunkRowIterator(AbstractDetailQueryResultIterator iterator, + CarbonColumnarBatch columnarBatch) { + this.iterator = iterator; + this.columnarBatch = columnarBatch; + if (iterator.hasNext()) { + iterator.processNextBatch(columnarBatch); + batchSize = columnarBatch.getActualSize(); + currentIndex = 0; + } + } + + /** + * Returns {@code true} if the iteration has more elements. (In other words, + * returns {@code true} if {@link #next} would return an element rather than + * throwing an exception.) + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + if (currentIndex < batchSize) { + return true; + } else { + while (iterator.hasNext()) { + columnarBatch.reset(); + iterator.processNextBatch(columnarBatch); + batchSize = columnarBatch.getActualSize(); + currentIndex = 0; + if (currentIndex < batchSize) { + return true; + } + } + } + return false; + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public Object[] next() { + Object[] row = new Object[columnarBatch.columnVectors.length]; + for (int i = 0; i < row.length; i++) { + row[i] = columnarBatch.columnVectors[i].getData(currentIndex); + } + currentIndex++; + return row; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java new file mode 100644 index 0000000..00116c5 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; + +/** + * It reads the data vector batch format + */ +public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIterator<Object> { + + private final Object lock = new Object(); + + public VectorDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + super(infos, queryModel, execService); + } + + @Override public Object next() { + throw new UnsupportedOperationException("call processNextBatch instaed"); + } + + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + synchronized (lock) { + updateDataBlockIterator(); + if (dataBlockIterator != null) { + dataBlockIterator.processNextBatch(columnarBatch); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java new file mode 100644 index 0000000..7d29b0f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.result.vector; + +import org.apache.spark.sql.types.Decimal; + +public interface CarbonColumnVector { + + void putShort(int rowId, short value); + + void putInt(int rowId, int value); + + void putLong(int rowId, long value); + + void putDecimal(int rowId, Decimal value, int precision); + + void putDouble(int rowId, double value); + + void putBytes(int rowId, byte[] value); + + void putBytes(int rowId, int offset, int length, byte[] value); + + void putNull(int rowId); + + boolean isNull(int rowId); + + void putObject(int rowId, Object obj); + + Object getData(int rowId); + + void reset(); + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java new file mode 100644 index 0000000..faeffde --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector; + +public class CarbonColumnarBatch { + + public CarbonColumnVector[] columnVectors; + + private int batchSize; + + private int actualSize; + + private int rowCounter; + + public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize) { + this.columnVectors = columnVectors; + this.batchSize = batchSize; + } + + public int getBatchSize() { + return batchSize; + } + + public int getActualSize() { + return actualSize; + } + + public void setActualSize(int actualSize) { + this.actualSize = actualSize; + } + + public void reset() { + actualSize = 0; + rowCounter = 0; + for (int i = 0; i < columnVectors.length; i++) { + columnVectors[i].reset(); + } + } + + public int getRowCounter() { + return rowCounter; + } + + public void setRowCounter(int rowCounter) { + this.rowCounter = rowCounter; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java new file mode 100644 index 0000000..852abe9 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.vector; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; + +public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> { + public int offset; + public int size; + public CarbonColumnVector vector; + public int vectorOffset; + public QueryDimension dimension; + public QueryMeasure measure; + public int ordinal; + public DirectDictionaryGenerator directDictionaryGenerator; + public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller; + public GenericQueryType genericQueryType; + + @Override public int compareTo(ColumnVectorInfo o) { + return ordinal - o.ordinal; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java new file mode 100644 index 0000000..3c25efc --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.vector; + +import java.math.BigDecimal; +import java.util.BitSet; + +import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; +import org.apache.carbondata.core.metadata.DataType; + +import org.apache.spark.sql.types.Decimal; + +public class MeasureDataVectorProcessor { + + public interface MeasureVectorFiller { + + void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info); + + void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk, + ColumnVectorInfo info); + } + + public static class IntegralMeasureVectorFiller implements MeasureVectorFiller { + + @Override + public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putInt(vectorOffset, + (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + } + vectorOffset++; + } + } + + @Override + public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk, + ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putInt(vectorOffset, + (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + } + vectorOffset++; + } + } + } + + public static class ShortMeasureVectorFiller implements MeasureVectorFiller { + + @Override + public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putShort(vectorOffset, + (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + } + vectorOffset++; + } + } + + @Override + public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk, + ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putShort(vectorOffset, + (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + } + vectorOffset++; + } + } + } + + public static class LongMeasureVectorFiller implements MeasureVectorFiller { + + @Override + public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putLong(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + } + vectorOffset++; + } + } + + @Override + public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk, + ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putLong(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + } + vectorOffset++; + } + } + } + + public static class DecimalMeasureVectorFiller implements MeasureVectorFiller { + + @Override + public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + int precision = info.measure.getMeasure().getPrecision(); + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + BigDecimal decimal = + dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(i); + Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal); + vector.putDecimal(vectorOffset, toDecimal, precision); + } + vectorOffset++; + } + } + + @Override + public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk, + ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + int precision = info.measure.getMeasure().getPrecision(); + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + BigDecimal decimal = + dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(currentRow); + Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal); + vector.putDecimal(vectorOffset, toDecimal, precision); + } + vectorOffset++; + } + } + } + + public static class DefaultMeasureVectorFiller implements MeasureVectorFiller { + + @Override + public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putDouble(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(i)); + } + vectorOffset++; + } + } + + @Override + public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk, + ColumnVectorInfo info) { + int offset = info.offset; + int len = offset + info.size; + int vectorOffset = info.vectorOffset; + CarbonColumnVector vector = info.vector; + BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putDouble(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(currentRow)); + } + vectorOffset++; + } + } + } + + public static class MeasureVectorFillerFactory { + + public static MeasureVectorFiller getMeasureVectorFiller(DataType dataType) { + switch (dataType) { + case SHORT: + return new ShortMeasureVectorFiller(); + case INT: + return new IntegralMeasureVectorFiller(); + case LONG: + return new LongMeasureVectorFiller(); + case DECIMAL: + return new DecimalMeasureVectorFiller(); + default: + return new DefaultMeasureVectorFiller(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java new file mode 100644 index 0000000..7ae1a41 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.vector.impl; + +import java.util.Arrays; +import java.util.BitSet; + +import org.apache.carbondata.core.metadata.DataType; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; + +public class CarbonColumnVectorImpl implements CarbonColumnVector { + + private Object[] data; + + private int[] ints; + + private long[] longs; + + private Decimal[] decimals; + + private byte[][] bytes; + + private double[] doubles; + + private BitSet nullBytes; + + private DataType dataType; + + public CarbonColumnVectorImpl(int batchSize, DataType dataType) { + nullBytes = new BitSet(batchSize); + this.dataType = dataType; + switch (dataType) { + case INT: + ints = new int[batchSize]; + break; + case LONG: + longs = new long[batchSize]; + break; + case DOUBLE: + doubles = new double[batchSize]; + break; + case STRING: + bytes = new byte[batchSize][]; + break; + case DECIMAL: + decimals = new Decimal[batchSize]; + break; + default: + data = new Object[batchSize]; + } + } + + @Override public void putShort(int rowId, short value) { + + } + + @Override public void putInt(int rowId, int value) { + ints[rowId] = value; + } + + @Override public void putLong(int rowId, long value) { + longs[rowId] = value; + } + + @Override public void putDecimal(int rowId, Decimal value, int precision) { + decimals[rowId] = value; + } + + @Override public void putDouble(int rowId, double value) { + doubles[rowId] = value; + } + + @Override public void putBytes(int rowId, byte[] value) { + bytes[rowId] = value; + } + + @Override public void putBytes(int rowId, int offset, int length, byte[] value) { + + } + + @Override public void putNull(int rowId) { + nullBytes.set(rowId); + } + + @Override public boolean isNull(int rowId) { + return nullBytes.get(rowId); + } + + @Override public void putObject(int rowId, Object obj) { + data[rowId] = obj; + } + + @Override public Object getData(int rowId) { + if (nullBytes.get(rowId)) { + return null; + } + switch (dataType) { + case INT: + return ints[rowId]; + case LONG: + return longs[rowId]; + case DOUBLE: + return doubles[rowId]; + case STRING: + return UTF8String.fromBytes(bytes[rowId]); + case DECIMAL: + return decimals[rowId]; + default: + return data[rowId]; + } + } + + @Override public void reset() { + nullBytes.clear(); + switch (dataType) { + case INT: + Arrays.fill(ints, 0); + break; + case LONG: + Arrays.fill(longs, 0); + break; + case DOUBLE: + Arrays.fill(doubles, 0); + break; + case STRING: + Arrays.fill(bytes, null); + break; + case DECIMAL: + Arrays.fill(decimals, null); + break; + default: + Arrays.fill(data, null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java new file mode 100644 index 0000000..32b3ac2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.scanner; + +import java.io.IOException; + +import org.apache.carbondata.core.update.data.BlockletDeleteDeltaCacheLoader; +import org.apache.carbondata.core.update.data.DeleteDeltaCacheLoaderIntf; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; + +/** + * Blocklet scanner class to process the block + */ +public abstract class AbstractBlockletScanner implements BlockletScanner { + + /** + * scanner result + */ + protected AbstractScannedResult scannedResult; + + /** + * block execution info + */ + protected BlockExecutionInfo blockExecutionInfo; + + public QueryStatisticsModel queryStatisticsModel; + + public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { + this.blockExecutionInfo = tableBlockExecutionInfos; + } + + @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + throws IOException, FilterUnsupportedException { + fillKeyValue(blocksChunkHolder); + return scannedResult; + } + + protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) throws IOException { + + QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM); + totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, + totalBlockletStatistic.getCount() + 1); + queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic); + QueryStatistic validScannedBlockletStatistic = queryStatisticsModel + .getStatisticsTypeAndObjMap().get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM); + validScannedBlockletStatistic + .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, + validScannedBlockletStatistic.getCount() + 1); + queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic); + scannedResult.reset(); + scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize()); + scannedResult.setBlockletId( + blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + + blocksChunkHolder.getDataBlock().nodeNumber()); + scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock() + .getDimensionChunks(blocksChunkHolder.getFileReader(), + blockExecutionInfo.getAllSelectedDimensionBlocksIndexes())); + scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock() + .getMeasureChunks(blocksChunkHolder.getFileReader(), + blockExecutionInfo.getAllSelectedMeasureBlocksIndexes())); + // loading delete data cache in blockexecutioninfo instance + DeleteDeltaCacheLoaderIntf deleteCacheLoader = + new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(), + blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier()); + deleteCacheLoader.loadDeleteDeltaFileDataToCache(); + scannedResult + .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java new file mode 100644 index 0000000..9484318 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.scanner; + +import java.io.IOException; + +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; + +/** + * Interface for processing the block + * Processing can be filter based processing or non filter based processing + */ +public interface BlockletScanner { + + /** + * Below method will used to process the block data and get the scanned result + * + * @param blocksChunkHolder block chunk which holds the block data + * @return scannerResult + * result after processing + */ + AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + throws IOException, FilterUnsupportedException; +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java new file mode 100644 index 0000000..48c9af2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.scanner.impl; + +import java.io.IOException; +import java.util.BitSet; + +import org.apache.carbondata.core.update.data.BlockletDeleteDeltaCacheLoader; +import org.apache.carbondata.core.update.data.DeleteDeltaCacheLoaderIntf; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.FileHolder; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult; +import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner; + +/** + * Below class will be used for filter query processing + * this class will be first apply the filter then it will read the block if + * required and return the scanned result + */ +public class FilterScanner extends AbstractBlockletScanner { + + /** + * filter tree + */ + private FilterExecuter filterExecuter; + /** + * this will be used to apply min max + * this will be useful for dimension column which is on the right side + * as node finder will always give tentative blocks, if column data stored individually + * and data is in sorted order then we can check whether filter is in the range of min max or not + * if it present then only we can apply filter on complete data. + * this will be very useful in case of sparse data when rows are + * repeating. + */ + private boolean isMinMaxEnabled; + + private QueryStatisticsModel queryStatisticsModel; + + public FilterScanner(BlockExecutionInfo blockExecutionInfo, + QueryStatisticsModel queryStatisticsModel) { + super(blockExecutionInfo); + scannedResult = new FilterQueryScannedResult(blockExecutionInfo); + // to check whether min max is enabled or not + String minMaxEnableValue = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED, + CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE); + if (null != minMaxEnableValue) { + isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue); + } + // get the filter tree + this.filterExecuter = blockExecutionInfo.getFilterExecuterTree(); + this.queryStatisticsModel = queryStatisticsModel; + } + + /** + * Below method will be used to process the block + * + * @param blocksChunkHolder block chunk holder which holds the data + * @throws FilterUnsupportedException + */ + @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + throws IOException, FilterUnsupportedException { + fillScannedResult(blocksChunkHolder); + return scannedResult; + } + + /** + * This method will process the data in below order + * 1. first apply min max on the filter tree and check whether any of the filter + * is fall on the range of min max, if not then return empty result + * 2. If filter falls on min max range then apply filter on actual + * data and get the filtered row index + * 3. if row index is empty then return the empty result + * 4. if row indexes is not empty then read only those blocks(measure or dimension) + * which was present in the query but not present in the filter, as while applying filter + * some of the blocks where already read and present in chunk holder so not need to + * read those blocks again, this is to avoid reading of same blocks which was already read + * 5. Set the blocks and filter indexes to result + * + * @param blocksChunkHolder + * @throws FilterUnsupportedException + */ + private void fillScannedResult(BlocksChunkHolder blocksChunkHolder) + throws FilterUnsupportedException, IOException { + scannedResult.reset(); + scannedResult.setBlockletId( + blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder + .getDataBlock().nodeNumber()); + // apply min max + if (isMinMaxEnabled) { + BitSet bitSet = this.filterExecuter + .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(), + blocksChunkHolder.getDataBlock().getColumnsMinValue()); + if (bitSet.isEmpty()) { + scannedResult.setNumberOfRows(0); + scannedResult.setIndexes(new int[0]); + CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(), + blocksChunkHolder.getMeasureDataChunk()); + return; + } + } + // apply filter on actual data + BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder); + // if indexes is empty then return with empty result + if (bitSet.isEmpty()) { + scannedResult.setNumberOfRows(0); + scannedResult.setIndexes(new int[0]); + return; + } + // valid scanned blocklet + QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM); + validScannedBlockletStatistic + .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, + validScannedBlockletStatistic.getCount() + 1); + queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic); + // get the row indexes from bot set + int[] indexes = new int[bitSet.cardinality()]; + int index = 0; + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + indexes[index++] = i; + } + // loading delete data cache in blockexecutioninfo instance + DeleteDeltaCacheLoaderIntf deleteCacheLoader = + new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(), + blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier()); + deleteCacheLoader.loadDeleteDeltaFileDataToCache(); + scannedResult + .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache()); + FileHolder fileReader = blocksChunkHolder.getFileReader(); + int[][] allSelectedDimensionBlocksIndexes = + blockExecutionInfo.getAllSelectedDimensionBlocksIndexes(); + DimensionColumnDataChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock() + .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes); + + DimensionColumnDataChunk[] dimensionColumnDataChunk = + new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()]; + // read dimension chunk blocks from file which is not present + for (int i = 0; i < dimensionColumnDataChunk.length; i++) { + if (null != blocksChunkHolder.getDimensionDataChunk()[i]) { + dimensionColumnDataChunk[i] = blocksChunkHolder.getDimensionDataChunk()[i]; + } + } + for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) { + System.arraycopy(projectionListDimensionChunk, allSelectedDimensionBlocksIndexes[i][0], + dimensionColumnDataChunk, allSelectedDimensionBlocksIndexes[i][0], + allSelectedDimensionBlocksIndexes[i][1] + 1 - allSelectedDimensionBlocksIndexes[i][0]); + } + MeasureColumnDataChunk[] measureColumnDataChunk = + new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()]; + int[][] allSelectedMeasureBlocksIndexes = + blockExecutionInfo.getAllSelectedMeasureBlocksIndexes(); + MeasureColumnDataChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock() + .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes); + // read the measure chunk blocks which is not present + for (int i = 0; i < measureColumnDataChunk.length; i++) { + if (null != blocksChunkHolder.getMeasureDataChunk()[i]) { + measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i]; + } + } + for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) { + System.arraycopy(projectionListMeasureChunk, allSelectedMeasureBlocksIndexes[i][0], + measureColumnDataChunk, allSelectedMeasureBlocksIndexes[i][0], + allSelectedMeasureBlocksIndexes[i][1] + 1 - allSelectedMeasureBlocksIndexes[i][0]); + } + scannedResult.setDimensionChunks(dimensionColumnDataChunk); + scannedResult.setIndexes(indexes); + scannedResult.setMeasureChunks(measureColumnDataChunk); + scannedResult.setNumberOfRows(indexes.length); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java new file mode 100644 index 0000000..f07c2be --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.scanner.impl; + +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult; +import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner; + +/** + * Non filter processor which will be used for non filter query + * In case of non filter query we just need to read all the blocks requested in the + * query and pass it to scanned result + */ +public class NonFilterScanner extends AbstractBlockletScanner { + + public NonFilterScanner(BlockExecutionInfo blockExecutionInfo, + QueryStatisticsModel queryStatisticsModel) { + super(blockExecutionInfo); + // as its a non filter query creating a non filter query scanned result object + scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo); + super.queryStatisticsModel = queryStatisticsModel; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java new file mode 100644 index 0000000..68eb946 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.wrappers; + +import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; + +/** + * This class will store the dimension column data when query is executed + * This can be used as a key for aggregation + */ +public class ByteArrayWrapper implements Comparable<ByteArrayWrapper> { + + /** + * to store key which is generated using + * key generator + */ + protected byte[] dictionaryKey; + + /** + * to store no dictionary column data + */ + protected byte[][] complexTypesKeys; + + /** + * to store no dictionary column data + */ + protected byte[][] noDictionaryKeys; + + /** + * contains value of implicit columns in byte array format + */ + protected byte[] implicitColumnByteArray; + + public ByteArrayWrapper() { + } + + /** + * @return the dictionaryKey + */ + public byte[] getDictionaryKey() { + return dictionaryKey; + } + + /** + * @param dictionaryKey the dictionaryKey to set + */ + public void setDictionaryKey(byte[] dictionaryKey) { + this.dictionaryKey = dictionaryKey; + } + + /** + * @param noDictionaryKeys the noDictionaryKeys to set + */ + public void setNoDictionaryKeys(byte[][] noDictionaryKeys) { + this.noDictionaryKeys = noDictionaryKeys; + } + + /** + * to get the no dictionary column data + * + * @param index of the no dictionary key + * @return no dictionary key for the index + */ + public byte[] getNoDictionaryKeyByIndex(int index) { + return this.noDictionaryKeys[index]; + } + + /** + * to get the no dictionary column data + * + * @param index of the no dictionary key + * @return no dictionary key for the index + */ + public byte[] getComplexTypeByIndex(int index) { + return this.complexTypesKeys[index]; + } + + /** + * to generate the hash code + */ + @Override public int hashCode() { + // first generate the has code of the dictionary column + int len = dictionaryKey.length; + int result = 1; + for (int j = 0; j < len; j++) { + result = 31 * result + dictionaryKey[j]; + } + // then no dictionary column + for (byte[] directSurrogateValue : noDictionaryKeys) { + for (int i = 0; i < directSurrogateValue.length; i++) { + result = 31 * result + directSurrogateValue[i]; + } + } + // then for complex type + for (byte[] complexTypeKey : complexTypesKeys) { + for (int i = 0; i < complexTypeKey.length; i++) { + result = 31 * result + complexTypeKey[i]; + } + } + return result; + } + + /** + * to validate the two + * + * @param other object + */ + @Override public boolean equals(Object other) { + if (null == other || !(other instanceof ByteArrayWrapper)) { + return false; + } + boolean result = false; + // Comparison will be as follows + // first compare the no dictionary column + // if it is not equal then return false + // if it is equal then compare the complex column + // if it is also equal then compare dictionary column + byte[][] noDictionaryKeysOther = ((ByteArrayWrapper) other).noDictionaryKeys; + if (noDictionaryKeysOther.length != noDictionaryKeys.length) { + return false; + } else { + for (int i = 0; i < noDictionaryKeys.length; i++) { + result = UnsafeComparer.INSTANCE.equals(noDictionaryKeys[i], noDictionaryKeysOther[i]); + if (!result) { + return false; + } + } + } + + byte[][] complexTypesKeysOther = ((ByteArrayWrapper) other).complexTypesKeys; + if (complexTypesKeysOther.length != complexTypesKeys.length) { + return false; + } else { + for (int i = 0; i < complexTypesKeys.length; i++) { + result = UnsafeComparer.INSTANCE.equals(complexTypesKeys[i], complexTypesKeysOther[i]); + if (!result) { + return false; + } + } + } + + return UnsafeComparer.INSTANCE.equals(dictionaryKey, ((ByteArrayWrapper) other).dictionaryKey); + } + + /** + * Compare method for ByteArrayWrapper class this will used to compare Two + * ByteArrayWrapper data object, basically it will compare two byte array + * + * @param other ArrayWrapper Object + */ + @Override public int compareTo(ByteArrayWrapper other) { + // compare will be as follows + //compare dictionary column + // then no dictionary column + // then complex type column data + int compareTo = UnsafeComparer.INSTANCE.compareTo(dictionaryKey, other.dictionaryKey); + if (compareTo == 0) { + for (int i = 0; i < noDictionaryKeys.length; i++) { + compareTo = + UnsafeComparer.INSTANCE.compareTo(noDictionaryKeys[i], other.noDictionaryKeys[i]); + if (compareTo != 0) { + return compareTo; + } + } + } + if (compareTo == 0) { + for (int i = 0; i < complexTypesKeys.length; i++) { + compareTo = + UnsafeComparer.INSTANCE.compareTo(complexTypesKeys[i], other.complexTypesKeys[i]); + if (compareTo != 0) { + return compareTo; + } + } + } + return compareTo; + } + + /** + * @return the complexTypesKeys + */ + public byte[][] getComplexTypesKeys() { + return complexTypesKeys; + } + + /** + * @param complexTypesKeys the complexTypesKeys to set + */ + public void setComplexTypesKeys(byte[][] complexTypesKeys) { + this.complexTypesKeys = complexTypesKeys; + } + + /** + * @return + */ + public byte[] getImplicitColumnByteArray() { + return implicitColumnByteArray; + } + + /** + * @param implicitColumnByteArray + */ + public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) { + this.implicitColumnByteArray = implicitColumnByteArray; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java b/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java new file mode 100644 index 0000000..3c33867 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.service; + +import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator; +import org.apache.carbondata.core.service.impl.DictionaryFactory; +import org.apache.carbondata.core.service.impl.PathFactory; +import org.apache.carbondata.core.service.ColumnUniqueIdService; +import org.apache.carbondata.core.service.DictionaryService; +import org.apache.carbondata.core.service.PathService; + +/** + * Interface to get services + */ +public class CarbonCommonFactory { + + /** + * @return dictionary service + */ + public static DictionaryService getDictionaryService() { + return DictionaryFactory.getInstance(); + } + + /** + * @return path service + */ + public static PathService getPathService() { + return PathFactory.getInstance(); + } + + /** + * @return unique id generator + */ + public static ColumnUniqueIdService getColumnUniqueIdGenerator() { + return ColumnUniqueIdGenerator.getInstance(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java new file mode 100644 index 0000000..b79f9db --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.service.impl; + +import java.util.UUID; + +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.service.ColumnUniqueIdService; + +/** + * It returns unique id given column + */ +public class ColumnUniqueIdGenerator implements ColumnUniqueIdService { + + private static ColumnUniqueIdService columnUniqueIdService = new ColumnUniqueIdGenerator(); + + @Override public String generateUniqueId(String databaseName, ColumnSchema columnSchema) { + return UUID.randomUUID().toString(); + } + + public static ColumnUniqueIdService getInstance() { + return columnUniqueIdService; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java b/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java new file mode 100644 index 0000000..d9a6d19 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.service.impl; + +import org.apache.carbondata.core.CarbonTableIdentifier; +import org.apache.carbondata.core.ColumnIdentifier; +import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader; +import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl; +import org.apache.carbondata.core.reader.CarbonDictionaryReader; +import org.apache.carbondata.core.reader.CarbonDictionaryReaderImpl; +import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader; +import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReaderImpl; +import org.apache.carbondata.core.service.DictionaryService; +import org.apache.carbondata.core.writer.CarbonDictionaryWriter; +import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; +import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter; +import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl; + +/** + * service to get dictionary reader and writer + */ +public class DictionaryFactory implements DictionaryService { + + private static DictionaryService dictService = new DictionaryFactory(); + + /** + * get dictionary writer + * + * @param carbonTableIdentifier + * @param columnIdentifier + * @param carbonStorePath + * @return + */ + @Override public CarbonDictionaryWriter getDictionaryWriter( + CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier, + String carbonStorePath) { + return new CarbonDictionaryWriterImpl(carbonStorePath, carbonTableIdentifier, columnIdentifier); + } + + /** + * get dictionary sort index writer + * + * @param carbonTableIdentifier + * @param columnIdentifier + * @param carbonStorePath + * @return + */ + @Override public CarbonDictionarySortIndexWriter getDictionarySortIndexWriter( + CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier, + String carbonStorePath) { + return new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, + carbonStorePath); + } + + /** + * get dictionary metadata reader + * + * @param carbonTableIdentifier + * @param columnIdentifier + * @param carbonStorePath + * @return + */ + @Override public CarbonDictionaryMetadataReader getDictionaryMetadataReader( + CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier, + String carbonStorePath) { + return new CarbonDictionaryMetadataReaderImpl(carbonStorePath, carbonTableIdentifier, + columnIdentifier); + } + + /** + * get dictionary reader + * + * @param carbonTableIdentifier + * @param columnIdentifier + * @param carbonStorePath + * @return + */ + @Override public CarbonDictionaryReader getDictionaryReader( + CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier, + String carbonStorePath) { + return new CarbonDictionaryReaderImpl(carbonStorePath, carbonTableIdentifier, columnIdentifier); + } + + /** + * get dictionary sort index reader + * + * @param carbonTableIdentifier + * @param columnIdentifier + * @param carbonStorePath + * @return + */ + @Override public CarbonDictionarySortIndexReader getDictionarySortIndexReader( + CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier, + String carbonStorePath) { + return new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, columnIdentifier, + carbonStorePath); + } + + public static DictionaryService getInstance() { + return dictService; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java b/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java new file mode 100644 index 0000000..9cd4b41 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.service.impl; + +import org.apache.carbondata.core.CarbonTableIdentifier; +import org.apache.carbondata.core.path.CarbonStorePath; +import org.apache.carbondata.core.path.CarbonTablePath; +import org.apache.carbondata.core.service.PathService; + +/** + * Create helper to get path details + */ +public class PathFactory implements PathService { + + private static PathService pathService = new PathFactory(); + + /** + * Return store path related to tables + */ + @Override public CarbonTablePath getCarbonTablePath( + String storeLocation, CarbonTableIdentifier tableIdentifier) { + return CarbonStorePath.getCarbonTablePath(storeLocation, tableIdentifier); + } + + public static PathService getInstance() { + return pathService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java index 26a4778..317d44a 100644 --- a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java @@ -22,8 +22,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; -import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO; -import org.apache.carbondata.common.iudprocessor.iuddata.RowCountDetailsVO; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.AbsoluteTableIdentifier; @@ -31,10 +29,12 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.path.CarbonStorePath; import org.apache.carbondata.core.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; +import org.apache.carbondata.core.datastorage.filesystem.CarbonFile; +import org.apache.carbondata.core.datastorage.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastorage.impl.FileFactory; import org.apache.carbondata.core.load.LoadMetadataDetails; +import org.apache.carbondata.core.update.data.BlockMappingVO; +import org.apache.carbondata.core.update.data.RowCountDetailsVO; import org.apache.carbondata.core.updatestatus.SegmentStatusManager; import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java deleted file mode 100644 index ad744c4..0000000 --- a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.core.update; - -import java.io.Serializable; - -/** - * VO class for storing details related to Update operation. - */ -public class UpdateVO implements Serializable { - private static final long serialVersionUID = 1L; - - private Long factTimestamp; - - private Long updateDeltaStartTimestamp; - - private String segmentId; - - public Long getLatestUpdateTimestamp() { - return latestUpdateTimestamp; - } - - public void setLatestUpdateTimestamp(Long latestUpdateTimestamp) { - this.latestUpdateTimestamp = latestUpdateTimestamp; - } - - private Long latestUpdateTimestamp; - - public Long getFactTimestamp() { - return factTimestamp; - } - - public void setFactTimestamp(Long factTimestamp) { - this.factTimestamp = factTimestamp; - } - - public Long getUpdateDeltaStartTimestamp() { - return updateDeltaStartTimestamp; - } - - public void setUpdateDeltaStartTimestamp(Long updateDeltaStartTimestamp) { - this.updateDeltaStartTimestamp = updateDeltaStartTimestamp; - } - - @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - UpdateVO updateVO = (UpdateVO) o; - if (factTimestamp != null ? - !factTimestamp.equals(updateVO.factTimestamp) : - updateVO.factTimestamp != null) { - return false; - } - if (updateDeltaStartTimestamp != null ? - !updateDeltaStartTimestamp.equals(updateVO.updateDeltaStartTimestamp) : - updateVO.updateDeltaStartTimestamp != null) { - return false; - } - return latestUpdateTimestamp != null ? - latestUpdateTimestamp.equals(updateVO.latestUpdateTimestamp) : - updateVO.latestUpdateTimestamp == null; - - } - - @Override public int hashCode() { - int result = factTimestamp != null ? factTimestamp.hashCode() : 0; - result = 31 * result + (updateDeltaStartTimestamp != null ? - updateDeltaStartTimestamp.hashCode() : - 0); - result = 31 * result + (latestUpdateTimestamp != null ? latestUpdateTimestamp.hashCode() : 0); - return result; - } - - /** - * This will return the update timestamp if its present or it will return the fact timestamp. - * @return - */ - public Long getCreatedOrUpdatedTimeStamp() { - if (null == latestUpdateTimestamp) { - return factTimestamp; - } - return latestUpdateTimestamp; - } - - public String getSegmentId() { - return segmentId; - } - - public void setSegmentId(String segmentId) { - this.segmentId = segmentId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java b/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java new file mode 100644 index 0000000..244c883 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.update.data; + +import java.util.Map; + +/** + * VO class to store the details of segment and block count , block and its row count. + */ +public class BlockMappingVO { + + private Map<String, Long> blockRowCountMapping ; + + private Map<String, Long> segmentNumberOfBlockMapping ; + + private Map<String, RowCountDetailsVO> completeBlockRowDetailVO; + + public void setCompleteBlockRowDetailVO(Map<String, RowCountDetailsVO> completeBlockRowDetailVO) { + this.completeBlockRowDetailVO = completeBlockRowDetailVO; + } + + public Map<String, RowCountDetailsVO> getCompleteBlockRowDetailVO() { + return completeBlockRowDetailVO; + } + + public Map<String, Long> getBlockRowCountMapping() { + return blockRowCountMapping; + } + + public Map<String, Long> getSegmentNumberOfBlockMapping() { + return segmentNumberOfBlockMapping; + } + + public BlockMappingVO(Map<String, Long> blockRowCountMapping, + Map<String, Long> segmentNumberOfBlockMapping) { + this.blockRowCountMapping = blockRowCountMapping; + this.segmentNumberOfBlockMapping = segmentNumberOfBlockMapping; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java b/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java new file mode 100644 index 0000000..2015663 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.update.data; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.AbsoluteTableIdentifier; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager; + +/** + * This class is responsible for loading delete delta file cache based on + * blocklet id of a particular block + */ +public class BlockletDeleteDeltaCacheLoader implements DeleteDeltaCacheLoaderIntf { + private String blockletID; + private DataRefNode blockletNode; + private AbsoluteTableIdentifier absoluteIdentifier; + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletDeleteDeltaCacheLoader.class.getName()); + + public BlockletDeleteDeltaCacheLoader(String blockletID, + DataRefNode blockletNode, AbsoluteTableIdentifier absoluteIdentifier) { + this.blockletID = blockletID; + this.blockletNode = blockletNode; + this.absoluteIdentifier= absoluteIdentifier; + } + + /** + * This method will load the delete delta cache based on blocklet id of particular block with + * the help of SegmentUpdateStatusManager. + */ + public void loadDeleteDeltaFileDataToCache() { + SegmentUpdateStatusManager segmentUpdateStatusManager = + new SegmentUpdateStatusManager(absoluteIdentifier); + int[] deleteDeltaFileData = null; + BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = null; + if (null == blockletNode.getDeleteDeltaDataCache()) { + try { + deleteDeltaFileData = segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID); + deleteDeltaDataCache = new BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData, + segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, null)); + } catch (Exception e) { + LOGGER.debug("Unable to retrieve delete delta files"); + } + } else { + deleteDeltaDataCache = blockletNode.getDeleteDeltaDataCache(); + // if already cache is present then validate the cache using timestamp + String cacheTimeStamp = segmentUpdateStatusManager + .getTimestampForRefreshCache(blockletID, deleteDeltaDataCache.getCacheTimeStamp()); + if (null != cacheTimeStamp) { + try { + deleteDeltaFileData = + segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID); + deleteDeltaDataCache = new BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData, + segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, cacheTimeStamp)); + } catch (Exception e) { + LOGGER.debug("Unable to retrieve delete delta files"); + } + } + } + blockletNode.setDeleteDeltaDataCache(deleteDeltaDataCache); + } +}