http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java deleted file mode 100644 index ef169af..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java +++ /dev/null @@ -1,981 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore.blockletindex; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Comparator; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.Cacheable; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; -import org.apache.carbondata.core.datastore.IndexKey; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.indexstore.BlockMetaInfo; -import org.apache.carbondata.core.indexstore.Blocklet; -import org.apache.carbondata.core.indexstore.BlockletDetailInfo; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; -import org.apache.carbondata.core.indexstore.row.DataMapRow; -import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; -import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; -import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; -import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; -import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.util.ByteUtil; -import org.apache.carbondata.core.util.DataFileFooterConverter; -import org.apache.carbondata.core.util.DataTypeUtil; - -import org.apache.commons.lang3.StringUtils; -import org.xerial.snappy.Snappy; - -/** - * Datamap implementation for blocklet. - */ -public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap implements Cacheable { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName()); - - private static int KEY_INDEX = 0; - - private static int MIN_VALUES_INDEX = 1; - - private static int MAX_VALUES_INDEX = 2; - - private static int ROW_COUNT_INDEX = 3; - - private static int FILE_PATH_INDEX = 4; - - private static int PAGE_COUNT_INDEX = 5; - - private static int VERSION_INDEX = 6; - - private static int SCHEMA_UPADATED_TIME_INDEX = 7; - - private static int BLOCK_INFO_INDEX = 8; - - private static int BLOCK_FOOTER_OFFSET = 9; - - private static int LOCATIONS = 10; - - private static int BLOCKLET_ID_INDEX = 11; - - private static int BLOCK_LENGTH = 12; - - private static int TASK_MIN_VALUES_INDEX = 0; - - private static int TASK_MAX_VALUES_INDEX = 1; - - private static int SCHEMA = 2; - - private static int PARTITION_INFO = 3; - - private UnsafeMemoryDMStore unsafeMemoryDMStore; - - private UnsafeMemoryDMStore unsafeMemorySummaryDMStore; - - private SegmentProperties segmentProperties; - - private int[] columnCardinality; - - private boolean isPartitionedSegment; - - @Override - public void init(DataMapModel dataMapModel) throws IOException, MemoryException { - long startTime = System.currentTimeMillis(); - assert (dataMapModel instanceof BlockletDataMapModel); - BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel; - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); - List<DataFileFooter> indexInfo = fileFooterConverter - .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData()); - isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment(); - DataMapRowImpl summaryRow = null; - byte[] schemaBinary = null; - // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID - // is id assigned to a blocklet within a part file - String tempFilePath = null; - int relativeBlockletId = 0; - for (DataFileFooter fileFooter : indexInfo) { - if (segmentProperties == null) { - List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); - schemaBinary = convertSchemaToBinary(columnInTable); - columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); - segmentProperties = new SegmentProperties(columnInTable, columnCardinality); - createSchema(segmentProperties); - createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary); - } - TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); - BlockMetaInfo blockMetaInfo = - blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath()); - // Here it loads info about all blocklets of index - // Only add if the file exists physically. There are scenarios which index file exists inside - // merge index but related carbondata files are deleted. In that case we first check whether - // the file exists physically or not - if (blockMetaInfo != null) { - if (fileFooter.getBlockletList() == null) { - // This is old store scenario, here blocklet information is not available in index file so - // load only block info - summaryRow = - loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, - blockMetaInfo); - } else { - // blocklet ID will start from 0 again only when part file path is changed - if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) { - tempFilePath = blockInfo.getFilePath(); - relativeBlockletId = 0; - } - summaryRow = - loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, - blockMetaInfo, relativeBlockletId); - // this is done because relative blocklet id need to be incremented based on the - // total number of blocklets - relativeBlockletId += fileFooter.getBlockletList().size(); - } - } - } - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.finishWriting(); - } - if (null != unsafeMemorySummaryDMStore) { - addTaskSummaryRowToUnsafeMemoryStore( - summaryRow, - blockletDataMapInfo.getPartitions(), - schemaBinary); - unsafeMemorySummaryDMStore.finishWriting(); - } - LOGGER.info( - "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + ( - System.currentTimeMillis() - startTime)); - } - - private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter, - SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, - BlockMetaInfo blockMetaInfo, int relativeBlockletId) { - int[] minMaxLen = segmentProperties.getColumnsValueSize(); - List<BlockletInfo> blockletList = fileFooter.getBlockletList(); - CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); - // Add one row to maintain task level min max for segment pruning - if (!blockletList.isEmpty() && summaryRow == null) { - summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); - } - for (int index = 0; index < blockletList.size(); index++) { - DataMapRow row = new DataMapRowImpl(schema); - int ordinal = 0; - int taskMinMaxOrdinal = 0; - BlockletInfo blockletInfo = blockletList.get(index); - - // add start key as index key - row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++); - - BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); - byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); - // compute and set task level min values - addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, - TASK_MIN_VALUES_INDEX, true); - ordinal++; - taskMinMaxOrdinal++; - byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); - // compute and set task level max values - addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, - TASK_MAX_VALUES_INDEX, false); - ordinal++; - - row.setInt(blockletInfo.getNumberOfRows(), ordinal++); - - // add file path - byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); - row.setByteArray(filePathBytes, ordinal++); - - // add pages - row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++); - - // add version number - row.setShort(fileFooter.getVersionId().number(), ordinal++); - - // add schema updated time - row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); - - // add blocklet info - byte[] serializedData; - try { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - DataOutput dataOutput = new DataOutputStream(stream); - blockletInfo.write(dataOutput); - serializedData = stream.toByteArray(); - row.setByteArray(serializedData, ordinal++); - // Add block footer offset, it is used if we need to read footer of block - row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); - setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); - ordinal++; - // for relative blockelt id i.e blocklet id that belongs to a particular part file - row.setShort((short) relativeBlockletId++, ordinal++); - // Store block size - row.setLong(blockMetaInfo.getSize(), ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - return summaryRow; - } - - private void setLocations(String[] locations, DataMapRow row, int ordinal) - throws UnsupportedEncodingException { - // Add location info - String locationStr = StringUtils.join(locations, ','); - row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal); - } - - /** - * Load information for the block.It is the case can happen only for old stores - * where blocklet information is not available in index file. So load only block information - * and read blocklet information in executor. - */ - private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter, - SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, - BlockMetaInfo blockMetaInfo) { - int[] minMaxLen = segmentProperties.getColumnsValueSize(); - BlockletIndex blockletIndex = fileFooter.getBlockletIndex(); - CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); - // Add one row to maintain task level min max for segment pruning - if (summaryRow == null) { - summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); - } - DataMapRow row = new DataMapRowImpl(schema); - int ordinal = 0; - int taskMinMaxOrdinal = 0; - // add start key as index key - row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++); - - BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex(); - byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); - // compute and set task level min values - addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, - TASK_MIN_VALUES_INDEX, true); - ordinal++; - taskMinMaxOrdinal++; - byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); - // compute and set task level max values - addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, - TASK_MAX_VALUES_INDEX, false); - ordinal++; - - row.setInt((int)fileFooter.getNumberOfRows(), ordinal++); - - // add file path - byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); - row.setByteArray(filePathBytes, ordinal++); - - // add pages - row.setShort((short) 0, ordinal++); - - // add version number - row.setShort(fileFooter.getVersionId().number(), ordinal++); - - // add schema updated time - row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); - - // add blocklet info - row.setByteArray(new byte[0], ordinal++); - - row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); - try { - setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); - ordinal++; - // for relative blocklet id. Value is -1 because in case of old store blocklet info will - // not be present in the index file and in that case we will not knwo the total number of - // blocklets - row.setShort((short) -1, ordinal++); - - // store block size - row.setLong(blockMetaInfo.getSize(), ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return summaryRow; - } - - private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, - List<String> partitions, byte[] schemaBinary) throws IOException { - // write the task summary info to unsafe memory store - if (null != summaryRow) { - // Add column schema , it is useful to generate segment properties in executor. - // So we no need to read footer again there. - if (schemaBinary != null) { - summaryRow.setByteArray(schemaBinary, SCHEMA); - } - if (partitions != null && partitions.size() > 0) { - CarbonRowSchema[] minSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore - .getSchema()[PARTITION_INFO]).getChildSchemas(); - DataMapRow partitionRow = new DataMapRowImpl(minSchemas); - for (int i = 0; i < partitions.size(); i++) { - partitionRow - .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS), - i); - } - summaryRow.setRow(partitionRow, PARTITION_INFO); - } - try { - unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - /** - * Fill the measures min values with minimum , this is needed for backward version compatability - * as older versions don't store min values for measures - */ - private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) { - byte[][] updatedValues = minValues; - if (minValues.length < minMaxLen.length) { - updatedValues = new byte[minMaxLen.length][]; - System.arraycopy(minValues, 0, updatedValues, 0, minValues.length); - List<CarbonMeasure> measures = segmentProperties.getMeasures(); - ByteBuffer buffer = ByteBuffer.allocate(8); - for (int i = 0; i < measures.size(); i++) { - buffer.rewind(); - DataType dataType = measures.get(i).getDataType(); - if (dataType == DataTypes.BYTE) { - buffer.putLong(Byte.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.SHORT) { - buffer.putLong(Short.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.INT) { - buffer.putLong(Integer.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.LONG) { - buffer.putLong(Long.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (DataTypes.isDecimal(dataType)) { - updatedValues[minValues.length + i] = - DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE)); - } else { - buffer.putDouble(Double.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } - } - } - return updatedValues; - } - - /** - * Fill the measures max values with maximum , this is needed for backward version compatability - * as older versions don't store max values for measures - */ - private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) { - byte[][] updatedValues = maxValues; - if (maxValues.length < minMaxLen.length) { - updatedValues = new byte[minMaxLen.length][]; - System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length); - List<CarbonMeasure> measures = segmentProperties.getMeasures(); - ByteBuffer buffer = ByteBuffer.allocate(8); - for (int i = 0; i < measures.size(); i++) { - buffer.rewind(); - DataType dataType = measures.get(i).getDataType(); - if (dataType == DataTypes.BYTE) { - buffer.putLong(Byte.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.SHORT) { - buffer.putLong(Short.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.INT) { - buffer.putLong(Integer.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.LONG) { - buffer.putLong(Long.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (DataTypes.isDecimal(dataType)) { - updatedValues[maxValues.length + i] = - DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE)); - } else { - buffer.putDouble(Double.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } - } - } - return updatedValues; - } - - private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema, - byte[][] minValues) { - CarbonRowSchema[] minSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); - DataMapRow minRow = new DataMapRowImpl(minSchemas); - int minOrdinal = 0; - // min value adding - for (int i = 0; i < minMaxLen.length; i++) { - minRow.setByteArray(minValues[i], minOrdinal++); - } - return minRow; - } - - /** - * This method will compute min/max values at task level - * - * @param taskMinMaxRow - * @param minMaxLen - * @param carbonRowSchema - * @param minMaxValue - * @param ordinal - * @param isMinValueComparison - */ - private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen, - CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal, - boolean isMinValueComparison) { - DataMapRow row = taskMinMaxRow.getRow(ordinal); - byte[][] updatedMinMaxValues = minMaxValue; - if (null == row) { - CarbonRowSchema[] minSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); - row = new DataMapRowImpl(minSchemas); - } else { - byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal); - // Compare and update min max values - for (int i = 0; i < minMaxLen.length; i++) { - int compare = - ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]); - if (isMinValueComparison) { - if (compare < 0) { - updatedMinMaxValues[i] = existingMinMaxValues[i]; - } - } else if (compare > 0) { - updatedMinMaxValues[i] = existingMinMaxValues[i]; - } - } - } - int minMaxOrdinal = 0; - // min/max value adding - for (int i = 0; i < minMaxLen.length; i++) { - row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++); - } - taskMinMaxRow.setRow(row, ordinal); - } - - private void createSchema(SegmentProperties segmentProperties) throws MemoryException { - List<CarbonRowSchema> indexSchemas = new ArrayList<>(); - - // Index key - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - getMinMaxSchema(segmentProperties, indexSchemas); - - // for number of rows. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT)); - - // for table block path - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - - // for number of pages. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); - - // for version number. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); - - // for schema updated time. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - - //for blocklet info - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - - // for block footer offset. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - - // for locations - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - - // for relative blocklet id i.e. blocklet id that belongs to a particular part file. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); - - // for storing block length. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - - unsafeMemoryDMStore = - new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); - } - - /** - * Creates the schema to store summary information or the information which can be stored only - * once per datamap. It stores datamap level max/min of each column and partition information of - * datamap - * @param segmentProperties - * @param partitions - * @throws MemoryException - */ - private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions, - byte[] schemaBinary) - throws MemoryException { - List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(); - getMinMaxSchema(segmentProperties, taskMinMaxSchemas); - // for storing column schema - taskMinMaxSchemas.add( - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length)); - if (partitions != null && partitions.size() > 0) { - CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()]; - for (int i = 0; i < mapSchemas.length; i++) { - mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); - } - CarbonRowSchema mapSchema = - new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), - mapSchemas); - taskMinMaxSchemas.add(mapSchema); - } - unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore( - taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()])); - } - - private void getMinMaxSchema(SegmentProperties segmentProperties, - List<CarbonRowSchema> minMaxSchemas) { - // Index key - int[] minMaxLen = segmentProperties.getColumnsValueSize(); - // do it 2 times, one for min and one for max. - for (int k = 0; k < 2; k++) { - CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length]; - for (int i = 0; i < minMaxLen.length; i++) { - if (minMaxLen[i] <= 0) { - mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); - } else { - mapSchemas[i] = - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]); - } - } - CarbonRowSchema mapSchema = - new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), - mapSchemas); - minMaxSchemas.add(mapSchema); - } - } - - @Override - public boolean isScanRequired(FilterResolverIntf filterExp) { - FilterExecuter filterExecuter = - FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i); - boolean isScanRequired = FilterExpressionProcessor.isScanRequired( - filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX), - getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX)); - if (isScanRequired) { - return true; - } - } - return false; - } - - private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) { - if (unsafeMemoryDMStore.getRowCount() == 0) { - return new ArrayList<>(); - } - // getting the start and end index key based on filter for hitting the - // selected block reference nodes based on filter resolver tree. - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("preparing the start and end key for finding" - + "start and end block as per filter resolver"); - } - List<Blocklet> blocklets = new ArrayList<>(); - Comparator<DataMapRow> comparator = - new BlockletDMComparator(segmentProperties.getColumnsValueSize(), - segmentProperties.getNumberOfSortColumns(), - segmentProperties.getNumberOfNoDictSortColumns()); - List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2); - FilterUtil - .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys); - // reading the first value from list which has start key - IndexKey searchStartKey = listOfStartEndKeys.get(0); - // reading the last value from list which has end key - IndexKey searchEndKey = listOfStartEndKeys.get(1); - if (null == searchStartKey && null == searchEndKey) { - try { - // TODO need to handle for no dictionary dimensions - searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); - // TODO need to handle for no dictionary dimensions - searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); - } catch (KeyGenException e) { - return null; - } - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Successfully retrieved the start and end key" + "Dictionary Start Key: " + Arrays - .toString(searchStartKey.getDictionaryKeys()) + "No Dictionary Start Key " + Arrays - .toString(searchStartKey.getNoDictionaryKeys()) + "Dictionary End Key: " + Arrays - .toString(searchEndKey.getDictionaryKeys()) + "No Dictionary End Key " + Arrays - .toString(searchEndKey.getNoDictionaryKeys())); - } - if (filterExp == null) { - int rowCount = unsafeMemoryDMStore.getRowCount(); - for (int i = 0; i < rowCount; i++) { - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow(); - blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX))); - } - } else { - int startIndex = findStartIndex(convertToRow(searchStartKey), comparator); - int endIndex = findEndIndex(convertToRow(searchEndKey), comparator); - FilterExecuter filterExecuter = - FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - while (startIndex <= endIndex) { - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow(); - int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX); - String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX), - CarbonCommonConstants.DEFAULT_CHARSET_CLASS); - boolean isValid = - addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX), - getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, blockletId); - if (isValid) { - blocklets.add(createBlocklet(safeRow, blockletId)); - } - startIndex++; - } - } - return blocklets; - } - - @Override - public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, - List<String> partitions) { - if (unsafeMemoryDMStore.getRowCount() == 0) { - return new ArrayList<>(); - } - // First get the partitions which are stored inside datamap. - List<String> storedPartitions = getPartitions(); - // if it has partitioned datamap but there is no partitioned information stored, it means - // partitions are dropped so return empty list. - if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) { - return new ArrayList<>(); - } - if (storedPartitions != null && storedPartitions.size() > 0) { - // Check the exact match of partition information inside the stored partitions. - boolean found = false; - if (partitions != null && partitions.size() > 0) { - found = partitions.containsAll(storedPartitions); - } - if (!found) { - return new ArrayList<>(); - } - } - // Prune with filters if the partitions are existed in this datamap - return prune(filterExp, segmentProperties); - } - - /** - * select the blocks based on column min and max value - * - * @param filterExecuter - * @param maxValue - * @param minValue - * @param filePath - * @param blockletId - * @return - */ - private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, byte[][] maxValue, - byte[][] minValue, String filePath, int blockletId) { - BitSet bitSet = null; - if (filterExecuter instanceof ImplicitColumnFilterExecutor) { - String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1); - // this case will come in case of old store where index file does not contain the - // blocklet information - if (blockletId != -1) { - uniqueBlockPath = uniqueBlockPath + CarbonCommonConstants.FILE_SEPARATOR + blockletId; - } - bitSet = ((ImplicitColumnFilterExecutor) filterExecuter) - .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath); - } else { - bitSet = filterExecuter.isScanRequired(maxValue, minValue); - } - if (!bitSet.isEmpty()) { - return true; - } else { - return false; - } - } - - public ExtendedBlocklet getDetailedBlocklet(String blockletId) { - int index = Integer.parseInt(blockletId); - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow(); - return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)); - } - - private byte[][] getMinMaxValue(DataMapRow row, int index) { - DataMapRow minMaxRow = row.getRow(index); - byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; - for (int i = 0; i < minMax.length; i++) { - minMax[i] = minMaxRow.getByteArray(i); - } - return minMax; - } - - private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) { - ExtendedBlocklet blocklet = new ExtendedBlocklet( - new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS), - blockletId + ""); - BlockletDetailInfo detailInfo = new BlockletDetailInfo(); - detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX)); - detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX)); - detailInfo.setVersionNumber(row.getShort(VERSION_INDEX)); - detailInfo.setBlockletId((short) blockletId); - detailInfo.setDimLens(columnCardinality); - detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX)); - byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX); - BlockletInfo blockletInfo = null; - try { - if (byteArray.length > 0) { - blockletInfo = new BlockletInfo(); - ByteArrayInputStream stream = new ByteArrayInputStream(byteArray); - DataInputStream inputStream = new DataInputStream(stream); - blockletInfo.readFields(inputStream); - inputStream.close(); - } - blocklet.setLocation( - new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET) - .split(",")); - } catch (IOException e) { - throw new RuntimeException(e); - } - detailInfo.setBlockletInfo(blockletInfo); - blocklet.setDetailInfo(detailInfo); - detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET)); - detailInfo.setColumnSchemaBinary(getColumnSchemaBinary()); - detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH)); - return blocklet; - } - - /** - * Binary search used to get the first tentative index row based on - * search key - * - * @param key search key - * @return first tentative block - */ - private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) { - int childNodeIndex; - int low = 0; - int high = unsafeMemoryDMStore.getRowCount() - 1; - int mid = 0; - int compareRes = -1; - // - while (low <= high) { - mid = (low + high) >>> 1; - // compare the entries - compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); - if (compareRes < 0) { - high = mid - 1; - } else if (compareRes > 0) { - low = mid + 1; - } else { - // if key is matched then get the first entry - int currentPos = mid; - while (currentPos - 1 >= 0 - && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) { - currentPos--; - } - mid = currentPos; - break; - } - } - // if compare result is less than zero then we - // and mid is more than 0 then we need to previous block as duplicates - // record can be present - if (compareRes < 0) { - if (mid > 0) { - mid--; - } - childNodeIndex = mid; - } else { - childNodeIndex = mid; - } - // get the leaf child - return childNodeIndex; - } - - /** - * Binary search used to get the last tentative block based on - * search key - * - * @param key search key - * @return first tentative block - */ - private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) { - int childNodeIndex; - int low = 0; - int high = unsafeMemoryDMStore.getRowCount() - 1; - int mid = 0; - int compareRes = -1; - // - while (low <= high) { - mid = (low + high) >>> 1; - // compare the entries - compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); - if (compareRes < 0) { - high = mid - 1; - } else if (compareRes > 0) { - low = mid + 1; - } else { - int currentPos = mid; - // if key is matched then get the first entry - while (currentPos + 1 < unsafeMemoryDMStore.getRowCount() - && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) { - currentPos++; - } - mid = currentPos; - break; - } - } - // if compare result is less than zero then we - // and mid is more than 0 then we need to previous block as duplicates - // record can be present - if (compareRes < 0) { - if (mid > 0) { - mid--; - } - childNodeIndex = mid; - } else { - childNodeIndex = mid; - } - return childNodeIndex; - } - - private DataMapRow convertToRow(IndexKey key) { - ByteBuffer buffer = - ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8); - buffer.putInt(key.getDictionaryKeys().length); - buffer.putInt(key.getNoDictionaryKeys().length); - buffer.put(key.getDictionaryKeys()); - buffer.put(key.getNoDictionaryKeys()); - DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema()); - dataMapRow.setByteArray(buffer.array(), 0); - return dataMapRow; - } - - private List<String> getPartitions() { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); - if (unsafeRow.getColumnCount() > PARTITION_INFO) { - List<String> partitions = new ArrayList<>(); - DataMapRow row = unsafeRow.getRow(PARTITION_INFO); - for (int i = 0; i < row.getColumnCount(); i++) { - partitions.add( - new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); - } - return partitions; - } - return null; - } - - private byte[] getColumnSchemaBinary() { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); - return unsafeRow.getByteArray(SCHEMA); - } - - /** - * Convert schema to binary - */ - private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) throws IOException { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - DataOutput dataOutput = new DataOutputStream(stream); - dataOutput.writeShort(columnSchemas.size()); - for (ColumnSchema columnSchema : columnSchemas) { - if (columnSchema.getColumnReferenceId() == null) { - columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId()); - } - columnSchema.write(dataOutput); - } - byte[] byteArray = stream.toByteArray(); - // Compress with snappy to reduce the size of schema - return Snappy.rawCompress(byteArray, byteArray.length); - } - - @Override - public void clear() { - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.freeMemory(); - unsafeMemoryDMStore = null; - segmentProperties = null; - } - // clear task min/max unsafe memory - if (null != unsafeMemorySummaryDMStore) { - unsafeMemorySummaryDMStore.freeMemory(); - unsafeMemorySummaryDMStore = null; - } - } - - @Override - public long getFileTimeStamp() { - return 0; - } - - @Override - public int getAccessCount() { - return 0; - } - - @Override - public long getMemorySize() { - long memoryUsed = 0L; - if (unsafeMemoryDMStore != null) { - memoryUsed += unsafeMemoryDMStore.getMemoryUsed(); - } - if (null != unsafeMemorySummaryDMStore) { - memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed(); - } - return memoryUsed; - } - - public SegmentProperties getSegmentProperties() { - return segmentProperties; - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java deleted file mode 100644 index a2c65ba..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore.blockletindex; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.CacheProvider; -import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.datamap.DataMapDistributable; -import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; -import org.apache.carbondata.core.datamap.dev.IndexDataMap; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.indexstore.Blocklet; -import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; -import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.events.Event; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; - -/** - * Table map for blocklet - */ -public class BlockletIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory - implements BlockletDetailsFetcher, - SegmentPropertiesFetcher { - - private static final String NAME = "clustered.btree.blocklet"; - - public static final DataMapSchema DATA_MAP_SCHEMA = - new DataMapSchema(NAME, BlockletIndexDataMapFactory.class.getName()); - - private AbsoluteTableIdentifier identifier; - - // segmentId -> list of index file - private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); - - private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainIndexDataMap> cache; - - @Override - public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) { - this.identifier = identifier; - cache = CacheProvider.getInstance() - .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP); - } - - @Override - public AbstractDataMapWriter createWriter(String segmentId, String dataWriterPath) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId) throws IOException { - List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - getTableBlockIndexUniqueIdentifiers(segmentId); - return cache.getAll(tableBlockIndexUniqueIdentifiers); - } - - private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers( - String segmentId) throws IOException { - List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - segmentMap.get(segmentId); - if (tableBlockIndexUniqueIdentifiers == null) { - tableBlockIndexUniqueIdentifiers = new ArrayList<>(); - String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); - List<String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); - for (int i = 0; i < indexFiles.size(); i++) { - tableBlockIndexUniqueIdentifiers.add( - new TableBlockIndexUniqueIdentifier(identifier, segmentId, indexFiles.get(i))); - } - segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers); - } - return tableBlockIndexUniqueIdentifiers; - } - - /** - * Get the blocklet detail information based on blockletid, blockid and segmentid. This method is - * exclusively for BlockletIndexDataMapFactory as detail information is only available in this - * default datamap. - */ - @Override - public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId) - throws IOException { - List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(); - // If it is already detailed blocklet then type cast and return same - if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) { - for (Blocklet blocklet : blocklets) { - detailedBlocklets.add((ExtendedBlocklet) blocklet); - } - return detailedBlocklets; - } - List<TableBlockIndexUniqueIdentifier> identifiers = - getTableBlockIndexUniqueIdentifiers(segmentId); - // Retrieve each blocklets detail information from blocklet datamap - for (Blocklet blocklet : blocklets) { - detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet)); - } - return detailedBlocklets; - } - - @Override - public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) - throws IOException { - if (blocklet instanceof ExtendedBlocklet) { - return (ExtendedBlocklet) blocklet; - } - List<TableBlockIndexUniqueIdentifier> identifiers = - getTableBlockIndexUniqueIdentifiers(segmentId); - return getExtendedBlocklet(identifiers, blocklet); - } - - private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers, - Blocklet blocklet) throws IOException { - String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId()); - for (TableBlockIndexUniqueIdentifier identifier : identifiers) { - if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) { - IndexDataMap indexDataMap = cache.get(identifier); - return ((BlockletIndexDataMap) indexDataMap).getDetailedBlocklet(blocklet.getBlockletId()); - } - } - throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found "); - } - - - - @Override - public List<DataMapDistributable> toDistributable(String segmentId) { - CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId); - List<DataMapDistributable> distributables = new ArrayList<>(); - for (int i = 0; i < carbonIndexFiles.length; i++) { - Path path = new Path(carbonIndexFiles[i].getPath()); - try { - FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); - RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); - LocatedFileStatus fileStatus = iter.next(); - String[] location = fileStatus.getBlockLocations()[0].getHosts(); - BlockletDataMapDistributable distributable = - new BlockletDataMapDistributable(path.getName()); - distributable.setLocations(location); - distributables.add(distributable); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return distributables; - } - - @Override public void fireEvent(Event event) { - - } - - @Override - public void clear(String segmentId) { - List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId); - if (blockIndexes != null) { - for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { - IndexDataMap indexDataMap = cache.getIfPresent(blockIndex); - if (indexDataMap != null) { - cache.invalidate(blockIndex); - indexDataMap.clear(); - } - } - } - } - - @Override - public void clear() { - for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { - clear(segmentId); - } - } - - @Override - public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable) - throws IOException { - BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable; - List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>(); - if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { - identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), - mapDistributable.getFilePath())); - } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - List<String> indexFiles = fileStore.getIndexFilesFromMergeFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath(), mapDistributable.getSegmentId()) - + "/" + mapDistributable.getFilePath()); - for (String indexFile : indexFiles) { - identifiers.add( - new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), - indexFile)); - } - } - List<AbstractCoarseGrainIndexDataMap> dataMaps; - try { - dataMaps = cache.getAll(identifiers); - } catch (IOException e) { - throw new RuntimeException(e); - } - return dataMaps; - } - - @Override - public DataMapMeta getMeta() { - // TODO: pass SORT_COLUMNS into this class - return null; - } - - @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException { - List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId); - assert (dataMaps.size() > 0); - AbstractCoarseGrainIndexDataMap coarseGrainDataMap = dataMaps.get(0); - assert (coarseGrainDataMap instanceof BlockletIndexDataMap); - BlockletIndexDataMap dataMap = (BlockletIndexDataMap) coarseGrainDataMap; - return dataMap.getSegmentProperties(); - } - - @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions) - throws IOException { - List<Blocklet> blocklets = new ArrayList<>(); - List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId); - for (AbstractCoarseGrainIndexDataMap dataMap : dataMaps) { - blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions)); - } - return blocklets; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 2393c54..ff93fd7 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -776,7 +776,7 @@ public class CarbonTable implements Serializable { } /** - * whether this table has aggregation IndexDataMap or not + * whether this table has aggregation DataMap or not */ public boolean hasAggregationDataMap() { List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java new file mode 100644 index 0000000..fa142aa --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java @@ -0,0 +1,59 @@ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.lang.reflect.Method; +import java.util.BitSet; + +import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl; +import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; +import org.apache.carbondata.core.util.ByteUtil; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.Before; +import org.junit.Test; + +public class TestBlockletDataMap extends AbstractDictionaryCacheTest { + + ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor; + @Before public void setUp() throws Exception { + CarbonImplicitDimension carbonImplicitDimension = + new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID); + DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo(); + dimColumnEvaluatorInfo.setColumnIndex(0); + dimColumnEvaluatorInfo.setRowIndex(0); + dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension); + dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false); + implicitIncludeFilterExecutor = + new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo); + } + + @Test public void testaddBlockBasedOnMinMaxValue() throws Exception { + + new MockUp<ImplicitIncludeFilterExecutorImpl>() { + @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue, + String uniqueBlockPath) { + BitSet bitSet = new BitSet(1); + bitSet.set(8); + return bitSet; + } + }; + + BlockletDataMap blockletDataMap = new BlockletDataMap(); + Method method = BlockletDataMap.class + .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class, + byte[][].class, String.class, int.class); + method.setAccessible(true); + + byte[][] minValue = { ByteUtil.toBytes("sfds") }; + byte[][] maxValue = { ByteUtil.toBytes("resa") }; + Object result = method + .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue, + "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata", + 0); + assert ((boolean) result); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java deleted file mode 100644 index 16048db..0000000 --- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.apache.carbondata.core.indexstore.blockletindex; - -import java.lang.reflect.Method; -import java.util.BitSet; - -import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl; -import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; -import org.apache.carbondata.core.util.ByteUtil; - -import mockit.Mock; -import mockit.MockUp; -import org.junit.Before; -import org.junit.Test; - -public class TestBlockletIndexDataMap extends AbstractDictionaryCacheTest { - - ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor; - @Before public void setUp() throws Exception { - CarbonImplicitDimension carbonImplicitDimension = - new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID); - DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo(); - dimColumnEvaluatorInfo.setColumnIndex(0); - dimColumnEvaluatorInfo.setRowIndex(0); - dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension); - dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false); - implicitIncludeFilterExecutor = - new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo); - } - - @Test public void testaddBlockBasedOnMinMaxValue() throws Exception { - - new MockUp<ImplicitIncludeFilterExecutorImpl>() { - @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue, - String uniqueBlockPath) { - BitSet bitSet = new BitSet(1); - bitSet.set(8); - return bitSet; - } - }; - - BlockletIndexDataMap blockletDataMap = new BlockletIndexDataMap(); - Method method = BlockletIndexDataMap.class - .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class, - byte[][].class, String.class, int.class); - method.setAccessible(true); - - byte[][] minValue = { ByteUtil.toBytes("sfds") }; - byte[][] maxValue = { ByteUtil.toBytes("resa") }; - Object result = method - .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue, - "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata", - 0); - assert ((boolean) result); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index fe0bbcf..b18a8fe 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -28,18 +28,17 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; -public class MinMaxDataWriter extends AbstractDataMapWriter { +public class MinMaxDataWriter extends DataMapWriter { private static final LogService LOGGER = LogServiceFactory.getLogService(TableInfo.class.getName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java index 216000b..5f933b3 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java @@ -28,7 +28,7 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; @@ -47,7 +47,7 @@ import com.google.gson.Gson; /** * Datamap implementation for min max blocklet. */ -public class MinMaxIndexDataMap extends AbstractCoarseGrainIndexDataMap { +public class MinMaxIndexDataMap extends CoarseGrainDataMap { private static final LogService LOGGER = LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java index 5f714a1..84a7c45 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java @@ -25,10 +25,10 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; @@ -38,7 +38,7 @@ import org.apache.carbondata.events.Event; /** * Min Max DataMap Factory */ -public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory { +public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { private AbsoluteTableIdentifier identifier; @@ -52,7 +52,7 @@ public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFa * @param segmentId * @return */ - @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) { + @Override public DataMapWriter createWriter(String segmentId, String dataWritePath) { return new MinMaxDataWriter(identifier, segmentId, dataWritePath); } @@ -63,9 +63,9 @@ public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFa * @return * @throws IOException */ - @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId) + @Override public List<CoarseGrainDataMap> getDataMaps(String segmentId) throws IOException { - List<AbstractCoarseGrainIndexDataMap> dataMapList = new ArrayList<>(); + List<CoarseGrainDataMap> dataMapList = new ArrayList<>(); // Form a dataMap of Type MinMaxIndexDataMap. MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap(); try { @@ -100,7 +100,7 @@ public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFa @Override public void clear() { } - @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable) + @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 98c3398..039bae2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -34,15 +34,15 @@ import java.util.Set; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapStoreManager; -import org.apache.carbondata.core.datamap.DataMapType; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -728,11 +728,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); List<ExtendedBlocklet> prunedBlocklets; - if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapType.FG) { + if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { DistributableDataMapFormat datamapDstr = new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds, partitionsToPrune, - BlockletIndexDataMapFactory.class.getName()); + BlockletDataMapFactory.class.getName()); prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); // Apply expression on the blocklets. prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala new file mode 100644 index 0000000..b81cce4 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} +import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter} +import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory} +import org.apache.carbondata.core.datastore.FileReader +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.compression.SnappyCompressor +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.Blocklet +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf +import org.apache.carbondata.core.util.ByteUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest + +class CGDataMapFactory extends CoarseGrainDataMapFactory { + var identifier: AbsoluteTableIdentifier = _ + var dataMapSchema: DataMapSchema = _ + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + this.dataMapSchema = dataMapSchema + } + + /** + * Return a new write for this datamap + */ + override def createWriter(segmentId: String, dataWritePath: String): DataMapWriter = { + new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) + } + + /** + * Get the datamap for segmentid + */ + override def getDataMaps(segmentId: String) = { + val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map {f => + val dataMap: CoarseGrainDataMap = new CGDataMap() + dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap + }.toList.asJava + } + + + /** + * Get datamaps for distributable object. + */ + override def getDataMaps( + distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = { + val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] + val dataMap: CoarseGrainDataMap = new CGDataMap() + dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + Seq(dataMap).asJava + } + + /** + * + * @param event + */ + override def fireEvent(event: Event): Unit = { + ??? + } + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String) = { + val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) + d + }.toList.asJava + } + + + /** + * Clears datamap of the segment + */ + override def clear(segmentId: String): Unit = { + + } + + /** + * Clear all datamaps from memory + */ + override def clear(): Unit = { + + } + + /** + * Return metadata of this datamap + */ + override def getMeta: DataMapMeta = { + new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, + List(ExpressionType.EQUALS, ExpressionType.IN).asJava) + } +} + +class CGDataMap extends CoarseGrainDataMap { + + var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _ + var FileReader: FileReader = _ + var filePath: String = _ + val compressor = new SnappyCompressor + + /** + * It is called to load the data map to memory or to initialize it. + */ + override def init(dataMapModel: DataMapModel): Unit = { + this.filePath = dataMapModel.getFilePath + val size = FileFactory.getCarbonFile(filePath).getSize + FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) + val footerLen = FileReader.readInt(filePath, size-4) + val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen) + val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(in) + maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]] + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + override def prune( + filterExp: FilterResolverIntf, + segmentProperties: SegmentProperties, + partitions: java.util.List[String]): java.util.List[Blocklet] = { + val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() + val expression = filterExp.getFilterExpression + getEqualToExpression(expression, buffer) + val value = buffer.map { f => + f.getChildren.get(1).evaluate(null).getString + } + val meta = findMeta(value(0).getBytes) + meta.map { f=> + new Blocklet(f._1, f._2+"") + }.asJava + } + + + private def findMeta(value: Array[Byte]) = { + val tuples = maxMin.filter { f => + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0 + } + tuples + } + + private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { + if (expression.isInstanceOf[EqualToExpression]) { + buffer += expression + } else { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) + } + } + } + } + + /** + * Clear complete index table and release memory. + */ + override def clear() = { + ??? + } + + override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? +} + +class CGDataMapWriter(identifier: AbsoluteTableIdentifier, + segmentId: String, + dataWritePath: String, + dataMapSchema: DataMapSchema) + extends DataMapWriter(identifier, segmentId, dataWritePath) { + + var currentBlockId: String = null + val cgwritepath = dataWritePath + "/" + + dataMapSchema.getDataMapName + System.nanoTime() + ".datamap" + lazy val stream: DataOutputStream = FileFactory + .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) + val blockletList = new ArrayBuffer[Array[Byte]]() + val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]() + val compressor = new SnappyCompressor + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String): Unit = { + currentBlockId = blockId + } + + /** + * End of block notification + */ + override def onBlockEnd(blockId: String): Unit = { + + } + + /** + * Start of new blocklet notification. + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletStart(blockletId: Int): Unit = { + + } + + /** + * End of blocklet notification + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletEnd(blockletId: Int): Unit = { + val sorted = blockletList + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) + maxMin += + ((currentBlockId+"", blockletId, (sorted.last, sorted.head))) + blockletList.clear() + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in DataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + override def onPageAdded(blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + val size = pages(0).getPageSize + val list = new ArrayBuffer[Array[Byte]]() + var i = 0 + while (i < size) { + val bytes = pages(0).getBytes(i) + val newBytes = new Array[Byte](bytes.length - 2) + System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) + list += newBytes + i = i + 1 + } + // Sort based on the column data in order to create index. + val sorted = list + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) + blockletList += sorted.head + blockletList += sorted.last + } + + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish(): Unit = { + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(maxMin) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + stream.writeInt(bytes.length) + stream.close() + commitFile(cgwritepath) + } + + +} + +class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/compaction/fil2.csv" + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test cg datamap") { + sql("DROP TABLE IF EXISTS datamap_test_cg") + sql( + """ + | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg") + // register datamap writer + sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test_cg where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + test("test cg datamap with 2 datamaps ") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") + sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), + sql("select * from normal_test where name='n502670' and city='c2670'")) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test_cg") + } +}