http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index 76ee084..4e07182 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -17,85 +17,74 @@ package org.apache.carbondata.datamap.bloom; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonUtil; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; /** - * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is - * constructed to indicate whether a value belongs to this blocklet. Bloom filter of blocklet that - * belongs to same block will be written to one index file suffixed with .bloomindex. So the number + * BloomDataMap is constructed in CG level (blocklet level). + * For each indexed column, a bloom filter is constructed to indicate whether a value + * belongs to this blocklet. Bloom filter of blocklet that belongs to same block will + * be written to one index file suffixed with .bloomindex. So the number * of bloom index file will be equal to that of the blocks. */ @InterfaceAudience.Internal public class BloomDataMapWriter extends DataMapWriter { - private String dataMapName; - private List<String> indexedColumns; + private static final LogService LOG = LogServiceFactory.getLogService( + BloomDataMapWriter.class.getCanonicalName()); private int bloomFilterSize; - // map column name to ordinal in pages - private Map<String, Integer> col2Ordianl; - private Map<String, DataType> col2DataType; - private String indexShardName; - private int currentBlockletId; + protected int currentBlockletId; private List<String> currentDMFiles; private List<DataOutputStream> currentDataOutStreams; private List<ObjectOutputStream> currentObjectOutStreams; - private List<BloomFilter<byte[]>> indexBloomFilters; - - @InterfaceAudience.Internal - public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta dataMapMeta, - int bloomFilterSize, Segment segment, String writeDirectoryPath) { - super(identifier, segment, writeDirectoryPath); - dataMapName = dataMapMeta.getDataMapName(); - indexedColumns = dataMapMeta.getIndexedColumns(); - this.bloomFilterSize = bloomFilterSize; - col2Ordianl = new HashMap<String, Integer>(indexedColumns.size()); - col2DataType = new HashMap<String, DataType>(indexedColumns.size()); + protected List<BloomFilter<byte[]>> indexBloomFilters; - currentDMFiles = new ArrayList<String>(indexedColumns.size()); - currentDataOutStreams = new ArrayList<DataOutputStream>(indexedColumns.size()); - currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexedColumns.size()); + BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, + Segment segment, String shardName, int bloomFilterSize) throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); + this.bloomFilterSize = bloomFilterSize; - indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexedColumns.size()); + currentDMFiles = new ArrayList<String>(indexColumns.size()); + currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size()); + currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexColumns.size()); + indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexColumns.size()); + initDataMapFile(); + resetBloomFilters(); } @Override - public void onBlockStart(String blockId, String indexShardName) throws IOException { - if (this.indexShardName == null) { - this.indexShardName = indexShardName; - initDataMapFile(); - } + public void onBlockStart(String blockId) throws IOException { } @Override public void onBlockEnd(String blockId) throws IOException { - } @Override public void onBlockletStart(int blockletId) { - this.currentBlockletId = blockletId; + } + + protected void resetBloomFilters() { indexBloomFilters.clear(); - for (int i = 0; i < indexedColumns.size(); i++) { + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int i = 0; i < indexColumns.size(); i++) { indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(), bloomFilterSize, 0.00001d)); } @@ -103,60 +92,51 @@ public class BloomDataMapWriter extends DataMapWriter { @Override public void onBlockletEnd(int blockletId) { - try { - writeBloomDataMapFile(); - } catch (Exception e) { - for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) { - CarbonUtil.closeStreams(objectOutputStream); - } - for (DataOutputStream dataOutputStream : currentDataOutStreams) { - CarbonUtil.closeStreams(dataOutputStream); - } - throw new RuntimeException(e); - } + writeBloomDataMapFile(); + currentBlockletId++; } - // notice that the input pages only contains the indexed columns @Override - public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) - throws IOException { - col2Ordianl.clear(); - col2DataType.clear(); - for (int colId = 0; colId < pages.length; colId++) { - String columnName = pages[colId].getColumnSpec().getFieldName().toLowerCase(); - col2Ordianl.put(columnName, colId); - DataType columnType = pages[colId].getColumnSpec().getSchemaDataType(); - col2DataType.put(columnName, columnType); - } - - // for each row - for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) { - // for each indexed column - for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - String indexedCol = indexedColumns.get(indexColId); + public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) { + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int rowId = 0; rowId < pageSize; rowId++) { + // for each indexed column, add the data to bloom filter + for (int i = 0; i < indexColumns.size(); i++) { + Object data = pages[i].getData(rowId); + DataType dataType = indexColumns.get(i).getDataType(); byte[] indexValue; - if (DataTypes.STRING == col2DataType.get(indexedCol) - || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) { - byte[] originValue = (byte[]) pages[col2Ordianl.get(indexedCol)].getData(rowId); + if (DataTypes.STRING == dataType) { + indexValue = getStringData(data); + } else if (DataTypes.BYTE_ARRAY == dataType) { + byte[] originValue = (byte[]) data; + // String and byte array is LV encoded, L is short type indexValue = new byte[originValue.length - 2]; System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2); } else { - Object originValue = pages[col2Ordianl.get(indexedCol)].getData(rowId); - indexValue = CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue); + indexValue = CarbonUtil.getValueAsBytes(dataType, data); } - - indexBloomFilters.get(indexColId).put(indexValue); + indexBloomFilters.get(i).put(indexValue); } } } + protected byte[] getStringData(Object data) { + byte[] lvData = (byte[]) data; + byte[] indexValue = new byte[lvData.length - 2]; + System.arraycopy(lvData, 2, indexValue, 0, lvData.length - 2); + return indexValue; + } + private void initDataMapFile() throws IOException { - String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, this.dataMapName); - dataMapDir = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR + this.indexShardName; - FileFactory.mkdirs(dataMapDir, FileFactory.getFileType(dataMapDir)); - for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - String dmFile = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR + - indexedColumns.get(indexColId) + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX; + if (!FileFactory.isFileExist(dataMapPath)) { + if (!FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) { + throw new IOException("Failed to create directory " + dataMapPath); + } + } + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + String dmFile = dataMapPath + CarbonCommonConstants.FILE_SEPARATOR + + indexColumns.get(indexColId).getColName() + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX; DataOutputStream dataOutStream = null; ObjectOutputStream objectOutStream = null; try { @@ -175,44 +155,45 @@ public class BloomDataMapWriter extends DataMapWriter { } } - private void writeBloomDataMapFile() throws IOException { - for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - BloomDMModel model = new BloomDMModel(this.currentBlockletId, - indexBloomFilters.get(indexColId)); - // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface. - // In lower version, we use default java serializer to write bloomfilter. - this.currentObjectOutStreams.get(indexColId).writeObject(model); - this.currentObjectOutStreams.get(indexColId).flush(); - this.currentDataOutStreams.get(indexColId).flush(); + protected void writeBloomDataMapFile() { + List<CarbonColumn> indexColumns = getIndexColumns(); + try { + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + BloomDMModel model = + new BloomDMModel(this.currentBlockletId, indexBloomFilters.get(indexColId)); + // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface. + // In lower version, we use default java serializer to write bloomfilter. + this.currentObjectOutStreams.get(indexColId).writeObject(model); + this.currentObjectOutStreams.get(indexColId).flush(); + this.currentDataOutStreams.get(indexColId).flush(); + } + } catch (Exception e) { + for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) { + CarbonUtil.closeStreams(objectOutputStream); + } + for (DataOutputStream dataOutputStream : currentDataOutStreams) { + CarbonUtil.closeStreams(dataOutputStream); + } + throw new RuntimeException(e); + } finally { + resetBloomFilters(); } } @Override public void finish() throws IOException { - for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId), - this.currentObjectOutStreams.get(indexColId)); - commitFile(this.currentDMFiles.get(indexColId)); + if (indexBloomFilters.size() > 0) { + writeBloomDataMapFile(); } + releaseResouce(); } - @Override - protected void commitFile(String dataMapFile) throws IOException { - super.commitFile(dataMapFile); + protected void releaseResouce() { + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { + CarbonUtil.closeStreams( + currentDataOutStreams.get(indexColId), currentObjectOutStreams.get(indexColId)); + } } - /** - * create and return path that will store the datamap - * - * @param dataPath patch to store the carbondata factdata - * @param dataMapName datamap name - * @return path to store the datamap - * @throws IOException - */ - public static String genDataMapStorePath(String dataPath, String dataMapName) - throws IOException { - String dmDir = dataPath + File.separator + dataMapName; - FileFactory.mkdirs(dmDir, FileFactory.getFileType(dmDir)); - return dmDir; - } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index 21a0b8e..0993218 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -33,15 +33,12 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; @@ -58,7 +55,6 @@ public class MinMaxDataWriter extends DataMapWriter { private Map<Integer, BlockletMinMax> blockMinMaxMap; - private String dataMapName; private int columnCnt; private DataType[] dataTypeArray; private String indexShardName; @@ -71,30 +67,25 @@ public class MinMaxDataWriter extends DataMapWriter { */ private Map<Integer, Integer> origin2MinMaxOrdinal = new HashMap<>(); - public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String dataMapName, Segment segment, - String dataWritePath) { - super(identifier, segment, dataWritePath); - this.dataMapName = dataMapName; - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable( - identifier.getDatabaseName(), identifier.getTableName()); - List<CarbonColumn> cols = carbonTable.getCreateOrderColumn(identifier.getTableName()); - this.columnCnt = cols.size(); - List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(identifier.getTableName()); - for (int i = 0; i < dimensions.size(); i++) { - this.origin2MinMaxOrdinal.put(dimensions.get(i).getSchemaOrdinal(), - dimensions.get(i).getOrdinal()); + public MinMaxDataWriter(CarbonTable carbonTable, DataMapSchema dataMapSchema, Segment segment, + String shardName, List<CarbonColumn> indexColumns) { + super(carbonTable.getTablePath(), dataMapSchema.getDataMapName(), indexColumns, segment, + shardName); + this.columnCnt = indexColumns.size(); + for (CarbonColumn col : indexColumns) { + this.origin2MinMaxOrdinal.put(col.getSchemaOrdinal(), col.getOrdinal()); } - List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(identifier.getTableName()); - for (int i = 0; i < measures.size(); i++) { - this.origin2MinMaxOrdinal.put(measures.get(i).getSchemaOrdinal(), - dimensions.size() + measures.get(i).getOrdinal()); + if (this.dataTypeArray == null) { + this.dataTypeArray = new DataType[this.columnCnt]; + for (int i = 0; i < this.columnCnt; i++) { + this.dataTypeArray[i] = indexColumns.get(i).getDataType(); + } } } - @Override public void onBlockStart(String blockId, String indexShardName) { + @Override public void onBlockStart(String blockId) { if (blockMinMaxMap == null) { blockMinMaxMap = new HashMap<>(); - this.indexShardName = indexShardName; } } @@ -111,46 +102,15 @@ public class MinMaxDataWriter extends DataMapWriter { } @Override - public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) { - // Calculate Min and Max value within this page. - - // As part of example we are extracting Min Max values Manually. The same can be done from - // retrieving the page statistics. For e.g. - - // if (pageLevelMin == null && pageLevelMax == null) { - // pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), - // pages[0].getStatistics().getMin()); - // pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), - // pages[0].getStatistics().getMax()); - // } else { - // if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], CarbonUtil - // .getValueAsBytes(pages[0].getStatistics().getDataType(), - // pages[0].getStatistics().getMin())) > 0) { - // pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), - // pages[0].getStatistics().getMin()); - // } - // if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], CarbonUtil - // .getValueAsBytes(pages[0].getStatistics().getDataType(), - // pages[0].getStatistics().getMax())) < 0) { - // pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), - // pages[0].getStatistics().getMax()); - // } - - if (this.dataTypeArray == null) { - this.dataTypeArray = new DataType[this.columnCnt]; - for (int i = 0; i < this.columnCnt; i++) { - this.dataTypeArray[i] = pages[i].getDataType(); - } - } - + public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) { // as an example, we don't use page-level min-max generated by native carbondata here, we get // the min-max by comparing each row - for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) { + for (int rowId = 0; rowId < pageSize; rowId++) { for (int colIdx = 0; colIdx < columnCnt; colIdx++) { Object originValue = pages[colIdx].getData(rowId); + DataType dataType = dataTypeArray[colIdx]; // for string & bytes_array, data is prefixed with length, need to remove it - if (DataTypes.STRING == pages[colIdx].getDataType() - || DataTypes.BYTE_ARRAY == pages[colIdx].getDataType()) { + if (DataTypes.STRING == dataType || DataTypes.BYTE_ARRAY == dataType) { byte[] valueMin0 = (byte[]) pageLevelMin[colIdx]; byte[] valueMax0 = (byte[]) pageLevelMax[colIdx]; byte[] value1 = (byte[]) originValue; @@ -164,10 +124,10 @@ public class MinMaxDataWriter extends DataMapWriter { pageLevelMax[colIdx] = new byte[value1.length - 2]; System.arraycopy(value1, 2, (byte[]) pageLevelMax[colIdx], 0, value1.length - 2); } - } else if (DataTypes.INT == pages[colIdx].getDataType()) { - updateMinMax(colIdx, originValue, pages[colIdx].getDataType()); + } else if (DataTypes.INT == dataType) { + updateMinMax(colIdx, originValue, dataType); } else { - throw new RuntimeException("Not implement yet"); + throw new UnsupportedOperationException("Not implement yet"); } } } @@ -276,8 +236,7 @@ public class MinMaxDataWriter extends DataMapWriter { */ public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails, String blockId) throws IOException { - String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, this.dataMapName); - String filePath = dataMapDir + File.separator + blockId + ".minmaxindex"; + String filePath = dataMapPath + File.separator + blockId + ".minmaxindex"; BufferedWriter brWriter = null; DataOutputStream dataOutStream = null; try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java index c110887..4197b79 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.DataMapRefresher; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory; @@ -51,25 +52,25 @@ import org.apache.commons.lang3.StringUtils; public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { private static final LogService LOGGER = LogServiceFactory.getLogService( MinMaxIndexDataMapFactory.class.getName()); + private DataMapSchema dataMapSchema; private DataMapMeta dataMapMeta; private String dataMapName; private AbsoluteTableIdentifier identifier; + public MinMaxIndexDataMapFactory(CarbonTable carbonTable) { + super(carbonTable); + } + // this is an example for datamap, we can choose the columns and operations that // will be supported by this datamap. Furthermore, we can add cache-support for this datamap. - @Override public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) + @Override public void init(DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException { + this.dataMapSchema = dataMapSchema; this.identifier = carbonTable.getAbsoluteTableIdentifier(); this.dataMapName = dataMapSchema.getDataMapName(); // columns that will be indexed List<CarbonColumn> allColumns = carbonTable.getCreateOrderColumn(identifier.getTableName()); - List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new Transformer() { - @Override public Object transform(Object o) { - return ((CarbonColumn) o).getColName(); - } - }); - LOGGER.info("MinMaxDataMap support index columns: " + StringUtils.join(minMaxCols, ", ")); // operations that will be supported on the indexed columns List<ExpressionType> optOperations = new ArrayList<>(); @@ -80,17 +81,24 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { optOperations.add(ExpressionType.LESSTHAN_EQUALTO); optOperations.add(ExpressionType.NOT_EQUALS); LOGGER.error("MinMaxDataMap support operations: " + StringUtils.join(optOperations, ", ")); - this.dataMapMeta = new DataMapMeta(minMaxCols, optOperations); + this.dataMapMeta = new DataMapMeta(allColumns, optOperations); } /** * createWriter will return the MinMaxDataWriter. * * @param segment + * @param shardName * @return */ - @Override public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) { - return new MinMaxDataWriter(identifier, dataMapName, segment, writeDirectoryPath); + @Override public DataMapWriter createWriter(Segment segment, String shardName) { + return new MinMaxDataWriter(carbonTable, dataMapSchema, segment, shardName, + dataMapMeta.getIndexedColumns()); + } + + @Override public DataMapRefresher createRefresher(Segment segment, String shardName) + throws IOException { + return null; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java index ba38319..dca5c90 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java @@ -22,15 +22,19 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; /** * FG level of lucene DataMap @@ -40,6 +44,11 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co private static final LogService LOGGER = LogServiceFactory.getLogService(LuceneCoarseGrainDataMapFactory.class.getName()); + public LuceneCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + super(carbonTable, dataMapSchema); + } + /** * Get the datamap for segmentid */ @@ -49,7 +58,7 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer); try { dataMap.init(new DataMapModel( - LuceneDataMapWriter.genDataMapStorePath( + DataMapWriter.getDefaultDataMapPath( tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName))); } catch (MemoryException e) { LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage()); @@ -69,7 +78,7 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co } @Override - public DataMapLevel getDataMapType() { + public DataMapLevel getDataMapLevel() { return DataMapLevel.CG; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index 1cde0c1..d52cef9 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapRefresher; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; @@ -51,7 +52,6 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; -import org.apache.commons.lang.StringUtils; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -59,9 +59,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; * Base implementation for CG and FG lucene DataMapFactory. */ @InterfaceAudience.Internal -abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFactory<T> { - - static final String TEXT_COLUMNS = "text_columns"; +abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactory<T> { /** * Logger @@ -88,26 +86,17 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac */ AbsoluteTableIdentifier tableIdentifier = null; - /** - * indexed carbon columns for lucene - */ - List<String> indexedCarbonColumns = null; - - CarbonTable carbonTable = null; - - - @Override - public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) - throws IOException, MalformedDataMapCommandException { + public LuceneDataMapFactoryBase(CarbonTable carbonTable, DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + super(carbonTable, dataMapSchema); Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier()); Objects.requireNonNull(dataMapSchema); - this.carbonTable = carbonTable; this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier(); this.dataMapName = dataMapSchema.getDataMapName(); // validate DataMapSchema and get index columns - List<String> indexedColumns = validateAndGetIndexedColumns(dataMapSchema, carbonTable); + List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema); // add optimizedOperations List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>(); @@ -126,55 +115,6 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac } /** - * validate Lucene DataMap - * 1. require TEXT_COLUMNS property - * 2. TEXT_COLUMNS can't contains illegal argument(empty, blank) - * 3. TEXT_COLUMNS can't contains duplicate same columns - * 4. TEXT_COLUMNS should be exists in table columns - * 5. TEXT_COLUMNS support only String DataType columns - */ - public static List<String> validateAndGetIndexedColumns(DataMapSchema dataMapSchema, - CarbonTable carbonTable) throws MalformedDataMapCommandException { - String textColumnsStr = dataMapSchema.getProperties().get(TEXT_COLUMNS); - if (textColumnsStr == null || StringUtils.isBlank(textColumnsStr)) { - throw new MalformedDataMapCommandException( - "Lucene DataMap require proper TEXT_COLUMNS property."); - } - String[] textColumns = textColumnsStr.split(",", -1); - for (int i = 0; i < textColumns.length; i++) { - textColumns[i] = textColumns[i].trim().toLowerCase(); - } - for (int i = 0; i < textColumns.length; i++) { - if (textColumns[i].isEmpty()) { - throw new MalformedDataMapCommandException("TEXT_COLUMNS contains illegal argument."); - } - for (int j = i + 1; j < textColumns.length; j++) { - if (textColumns[i].equals(textColumns[j])) { - throw new MalformedDataMapCommandException( - "TEXT_COLUMNS has duplicate columns :" + textColumns[i]); - } - } - } - List<String> indexedCarbonColumns = new ArrayList<>(textColumns.length); - for (int i = 0; i < textColumns.length; i++) { - CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]); - if (null == column) { - throw new MalformedDataMapCommandException("TEXT_COLUMNS: " + textColumns[i] - + " does not exist in table. Please check create DataMap statement."); - } else if (column.getDataType() != DataTypes.STRING) { - throw new MalformedDataMapCommandException( - "TEXT_COLUMNS only supports String column. " + "Unsupported column: " + textColumns[i] - + ", DataType: " + column.getDataType()); - } else if (column.getEncoder().contains(Encoding.DICTIONARY)) { - throw new MalformedDataMapCommandException( - "TEXT_COLUMNS cannot contain dictionary column " + column.getColName()); - } - indexedCarbonColumns.add(column.getColName()); - } - return indexedCarbonColumns; - } - - /** * this method will delete the datamap folders during drop datamap * @throws MalformedDataMapCommandException */ @@ -206,10 +146,16 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac * Return a new write for this datamap */ @Override - public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) { - LOGGER.info("lucene data write to " + writeDirectoryPath); - return new LuceneDataMapWriter(tableIdentifier, dataMapName, segment, writeDirectoryPath, true, - indexedCarbonColumns); + public DataMapWriter createWriter(Segment segment, String shardName) { + LOGGER.info("lucene data write to " + shardName); + return new LuceneDataMapWriter(getCarbonTable().getTablePath(), dataMapName, + dataMapMeta.getIndexedColumns(), segment, shardName, true); + } + + @Override + public DataMapRefresher createRefresher(Segment segment, String shardName) { + return new LuceneDataMapRefresher(getCarbonTable().getTablePath(), dataMapName, + segment, shardName, dataMapMeta.getIndexedColumns()); } /** @@ -280,7 +226,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac try { // there can be multiple lucene datamaps present on a table, so get all datamaps and form // the path till the index file directories in all datamaps folders present in each segment - dataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable); + dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable()); } catch (IOException ex) { LOGGER.error("failed to get datamaps"); } @@ -302,4 +248,26 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac } return indexDirs.toArray(new CarbonFile[0]); } + + /** + * Further validate whether it is string column and dictionary column. + * Currently only string and non-dictionary column is supported for Lucene DataMap + */ + @Override + public void validate() throws MalformedDataMapCommandException { + super.validate(); + List<CarbonColumn> indexColumns = getCarbonTable().getIndexedColumns(getDataMapSchema()); + + for (CarbonColumn column : indexColumns) { + if (column.getDataType() != DataTypes.STRING) { + throw new MalformedDataMapCommandException(String.format( + "Only String column is supported, column '%s' is %s type. ", + column.getColName(), column.getDataType())); + } else if (column.getEncoder().contains(Encoding.DICTIONARY)) { + throw new MalformedDataMapCommandException(String.format( + "Dictionary column is not supported, column '%s' is dictionary column", + column.getColName())); + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java new file mode 100644 index 0000000..ee500ef --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMapRefresher; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; +import org.apache.lucene.codecs.lucene62.Lucene62Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.DoublePoint; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FloatPoint; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.IntRangeField; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.solr.store.hdfs.HdfsDirectory; + +public class LuceneDataMapRefresher implements DataMapRefresher { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName()); + + private String dataMapPath; + + private List<CarbonColumn> indexColumns; + + private int columnsCount; + + private IndexWriter indexWriter = null; + + private IndexWriter pageIndexWriter = null; + + private Analyzer analyzer = null; + + LuceneDataMapRefresher(String tablePath, String dataMapName, + Segment segment, String shardName, List<CarbonColumn> indexColumns) { + this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName( + tablePath, segment.getSegmentNo(), dataMapName, shardName); + this.indexColumns = indexColumns; + this.columnsCount = indexColumns.size(); + } + + @Override + public void initialize() throws IOException { + // get index path, put index data into segment's path + Path indexPath = FileFactory.getPath(dataMapPath); + FileSystem fs = FileFactory.getFileSystem(indexPath); + + // if index path exists, should delete it because we are + // rebuilding the whole datamap for all segments + if (fs.exists(indexPath)) { + fs.delete(indexPath, true); + } + if (!fs.mkdirs(indexPath)) { + LOGGER.error("Failed to create directory " + indexPath); + } + + if (null == analyzer) { + analyzer = new StandardAnalyzer(); + } + + // create a index writer + Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); + + IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); + if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE, + CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT) + .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) { + indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED)); + } else { + indexWriterConfig + .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION)); + } + + indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); + } + + private IndexWriter createPageIndexWriter() throws IOException { + // save index data into ram, write into disk after one page finished + RAMDirectory ramDir = new RAMDirectory(); + return new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); + } + + private void addPageIndex(IndexWriter pageIndexWriter) throws IOException { + + Directory directory = pageIndexWriter.getDirectory(); + + // close ram writer + pageIndexWriter.close(); + + // add ram index data into disk + indexWriter.addIndexes(directory); + + // delete this ram data + directory.close(); + } + + @Override + public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException { + if (rowId == 0) { + if (pageIndexWriter != null) { + addPageIndex(pageIndexWriter); + } + pageIndexWriter = createPageIndexWriter(); + } + + // create a new document + Document doc = new Document(); + + // add blocklet Id + doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); + doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); + + // add page id + doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); + doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); + + // add row id + doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId)); + doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId)); + + // add other fields + for (int colIdx = 0; colIdx < columnsCount; colIdx++) { + CarbonColumn column = indexColumns.get(colIdx); + addField(doc, column.getColName(), column.getDataType(), values[colIdx]); + } + + pageIndexWriter.addDocument(doc); + } + + private boolean addField(Document doc, String fieldName, DataType type, Object value) { + if (type == DataTypes.STRING) { + doc.add(new TextField(fieldName, (String) value, Field.Store.NO)); + } else if (type == DataTypes.BYTE) { + // byte type , use int range to deal with byte, lucene has no byte type + IntRangeField field = + new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE }); + field.setIntValue((int) value); + doc.add(field); + } else if (type == DataTypes.SHORT) { + // short type , use int range to deal with short type, lucene has no short type + IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE }, + new int[] { Short.MAX_VALUE }); + field.setShortValue((short) value); + doc.add(field); + } else if (type == DataTypes.INT) { + // int type , use int point to deal with int type + doc.add(new IntPoint(fieldName, (int) value)); + } else if (type == DataTypes.LONG) { + // long type , use long point to deal with long type + doc.add(new LongPoint(fieldName, (long) value)); + } else if (type == DataTypes.FLOAT) { + doc.add(new FloatPoint(fieldName, (float) value)); + } else if (type == DataTypes.DOUBLE) { + doc.add(new DoublePoint(fieldName, (double) value)); + } else if (type == DataTypes.DATE) { + // TODO: how to get data value + } else if (type == DataTypes.TIMESTAMP) { + // TODO: how to get + } else if (type == DataTypes.BOOLEAN) { + IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); + field.setIntValue((boolean) value ? 1 : 0); + doc.add(field); + } else { + LOGGER.error("unsupport data type " + type); + throw new RuntimeException("unsupported data type " + type); + } + return true; + } + + @Override + public void finish() throws IOException { + if (indexWriter != null && pageIndexWriter != null) { + addPageIndex(pageIndexWriter); + } + } + + @Override + public void close() throws IOException { + if (indexWriter != null) { + indexWriter.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java index 95823bb..12fa1ef 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java @@ -17,7 +17,6 @@ package org.apache.carbondata.datamap.lucene; -import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; @@ -30,11 +29,10 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -76,12 +74,8 @@ public class LuceneDataMapWriter extends DataMapWriter { private Analyzer analyzer = null; - private String dataMapName = null; - private boolean isFineGrain = true; - private List<String> indexedCarbonColumns = null; - public static final String BLOCKLETID_NAME = "blockletId"; private String indexShardName = null; @@ -90,69 +84,54 @@ public class LuceneDataMapWriter extends DataMapWriter { public static final String ROWID_NAME = "rowId"; - LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, Segment segment, - String writeDirectoryPath, boolean isFineGrain, List<String> indexedCarbonColumns) { - super(identifier, segment, writeDirectoryPath); - this.dataMapName = dataMapName; + LuceneDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, + Segment segment, String shardName, boolean isFineGrain) { + super(tablePath, dataMapName, indexColumns, segment, shardName); this.isFineGrain = isFineGrain; - this.indexedCarbonColumns = indexedCarbonColumns; - } - - private String getIndexPath(String taskName) { - if (isFineGrain) { - return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, - taskName); - } else { - // TODO: where write data in coarse grain data map - return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, - taskName); - } } /** * Start of new block notification. */ - public void onBlockStart(String blockId, String indexShardName) throws IOException { - if (this.indexShardName == null || !this.indexShardName.equals(indexShardName)) { - if (indexWriter != null) { - return; - } - // get index path, put index data into segment's path - String strIndexPath = getIndexPath(indexShardName); - Path indexPath = FileFactory.getPath(strIndexPath); - FileSystem fs = FileFactory.getFileSystem(indexPath); - - // if index path not exists, create it - if (!fs.exists(indexPath)) { - fs.mkdirs(indexPath); + public void onBlockStart(String blockId) throws IOException { + if (indexWriter != null) { + return; + } + // get index path, put index data into segment's path + Path indexPath = FileFactory.getPath(dataMapPath); + FileSystem fs = FileFactory.getFileSystem(indexPath); + + // if index path not exists, create it + if (!fs.exists(indexPath)) { + if (!fs.mkdirs(indexPath)) { + throw new IOException("Failed to create directory " + dataMapPath); } + } - if (null == analyzer) { - analyzer = new StandardAnalyzer(); - } + if (null == analyzer) { + analyzer = new StandardAnalyzer(); + } - // the indexWriter closes the FileSystem on closing the writer, so for a new configuration - // and disable the cache for the index writer, it will be closed on closing the writer - Configuration conf = new Configuration(); - conf.set("fs.hdfs.impl.disable.cache", "true"); - - // create a index writer - Directory indexDir = new HdfsDirectory(indexPath, conf); - - IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); - if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE, - CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT) - .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) { - indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED)); - } else { - indexWriterConfig - .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION)); - } + // the indexWriter closes the FileSystem on closing the writer, so for a new configuration + // and disable the cache for the index writer, it will be closed on closing the writer + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl.disable.cache", "true"); + + // create a index writer + Directory indexDir = new HdfsDirectory(indexPath, conf); - indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); + IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); + if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE, + CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT) + .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) { + indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED)); + } else { + indexWriterConfig + .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION)); } + indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); } /** @@ -162,18 +141,30 @@ public class LuceneDataMapWriter extends DataMapWriter { } + private RAMDirectory ramDir; + private IndexWriter ramIndexWriter; + /** * Start of new blocklet notification. */ - public void onBlockletStart(int blockletId) { - + public void onBlockletStart(int blockletId) throws IOException { + // save index data into ram, write into disk after one page finished + ramDir = new RAMDirectory(); + ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); } /** * End of blocklet notification */ - public void onBlockletEnd(int blockletId) { + public void onBlockletEnd(int blockletId) throws IOException { + // close ram writer + ramIndexWriter.close(); + + // add ram index data into disk + indexWriter.addIndexes(ramDir); + // delete this ram data + ramDir.close(); } /** @@ -182,72 +173,54 @@ public class LuceneDataMapWriter extends DataMapWriter { * Implementation should copy the content of `pages` as needed, because `pages` memory * may be freed after this method returns, if using unsafe column page. */ - public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) throws IOException { - // save index data into ram, write into disk after one page finished - RAMDirectory ramDir = new RAMDirectory(); - IndexWriter ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); - - int columnsCount = pages.length; - if (columnsCount <= 0) { - LOGGER.warn("empty data"); - ramIndexWriter.close(); - ramDir.close(); - return; - } - - int pageSize = pages[0].getPageSize(); + public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) + throws IOException { for (int rowId = 0; rowId < pageSize; rowId++) { // create a new document Document doc = new Document(); // add blocklet Id - doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId })); + doc.add(new IntPoint(BLOCKLETID_NAME, blockletId)); doc.add(new StoredField(BLOCKLETID_NAME, blockletId)); //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId)); // add page id and row id in Fine Grain data map if (isFineGrain) { // add page Id - doc.add(new IntPoint(PAGEID_NAME, new int[] { pageId })); + doc.add(new IntPoint(PAGEID_NAME, pageId)); doc.add(new StoredField(PAGEID_NAME, pageId)); //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId)); // add row id - doc.add(new IntPoint(ROWID_NAME, new int[] { rowId })); + doc.add(new IntPoint(ROWID_NAME, rowId)); doc.add(new StoredField(ROWID_NAME, rowId)); //doc.add(new NumericDocValuesField(ROWID_NAME,rowId)); } // add indexed columns value into the document - for (ColumnPage page : pages) { - if (!page.getNullBits().get(rowId)) { - addField(doc, page, rowId, Field.Store.NO); + List<CarbonColumn> indexColumns = getIndexColumns(); + for (int i = 0; i < pages.length; i++) { + // add to lucene only if value is not null + if (!pages[i].getNullBits().get(rowId)) { + addField(doc, pages[i].getData(rowId), indexColumns.get(i), Field.Store.NO); } } // add this document ramIndexWriter.addDocument(doc); - } - // close ram writer - ramIndexWriter.close(); - - // add ram index data into disk - indexWriter.addIndexes(new Directory[] { ramDir }); - // delete this ram data - ramDir.close(); } - private boolean addField(Document doc, ColumnPage page, int rowId, Field.Store store) { + private boolean addField(Document doc, Object data, CarbonColumn column, Field.Store store) { //get field name - String fieldName = page.getColumnSpec().getFieldName(); + String fieldName = column.getColName(); //get field type - DataType type = page.getColumnSpec().getSchemaDataType(); + DataType type = column.getDataType(); if (type == DataTypes.BYTE) { // byte type , use int range to deal with byte, lucene has no byte type - byte value = page.getByte(rowId); + byte value = (byte) data; IntRangeField field = new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE }); field.setIntValue(value); @@ -259,7 +232,7 @@ public class LuceneDataMapWriter extends DataMapWriter { } } else if (type == DataTypes.SHORT) { // short type , use int range to deal with short type, lucene has no short type - short value = page.getShort(rowId); + short value = (short) data; IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE }, new int[] { Short.MAX_VALUE }); field.setShortValue(value); @@ -271,8 +244,8 @@ public class LuceneDataMapWriter extends DataMapWriter { } } else if (type == DataTypes.INT) { // int type , use int point to deal with int type - int value = page.getInt(rowId); - doc.add(new IntPoint(fieldName, new int[] { value })); + int value = (int) data; + doc.add(new IntPoint(fieldName, value)); // if need store it , add StoredField if (store == Field.Store.YES) { @@ -280,27 +253,27 @@ public class LuceneDataMapWriter extends DataMapWriter { } } else if (type == DataTypes.LONG) { // long type , use long point to deal with long type - long value = page.getLong(rowId); - doc.add(new LongPoint(fieldName, new long[] { value })); + long value = (long) data; + doc.add(new LongPoint(fieldName, value)); // if need store it , add StoredField if (store == Field.Store.YES) { doc.add(new StoredField(fieldName, value)); } } else if (type == DataTypes.FLOAT) { - float value = page.getFloat(rowId); - doc.add(new FloatPoint(fieldName, new float[] { value })); + float value = (float) data; + doc.add(new FloatPoint(fieldName, value)); if (store == Field.Store.YES) { doc.add(new FloatPoint(fieldName, value)); } } else if (type == DataTypes.DOUBLE) { - double value = page.getDouble(rowId); - doc.add(new DoublePoint(fieldName, new double[] { value })); + double value = (double) data; + doc.add(new DoublePoint(fieldName, value)); if (store == Field.Store.YES) { doc.add(new DoublePoint(fieldName, value)); } } else if (type == DataTypes.STRING) { - byte[] value = page.getBytes(rowId); + byte[] value = (byte[]) data; // TODO: how to get string value String strValue = null; try { @@ -310,11 +283,11 @@ public class LuceneDataMapWriter extends DataMapWriter { } doc.add(new TextField(fieldName, strValue, store)); } else if (type == DataTypes.DATE) { - // TODO: how to get data value + throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.TIMESTAMP) { - // TODO: how to get + throw new RuntimeException("unsupported data type " + type); } else if (type == DataTypes.BOOLEAN) { - boolean value = page.getBoolean(rowId); + boolean value = (boolean) data; IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); field.setIntValue(value ? 1 : 0); doc.add(field); @@ -339,23 +312,4 @@ public class LuceneDataMapWriter extends DataMapWriter { } } - /** - * Return store path for datamap - */ - public static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) { - return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName; - } - - /** - * Return store path for datamap based on the taskName,if three tasks get launched during loading, - * then three folders will be created based on the three task Ids and lucene index file will be - * written into those folders - * - * @return store path based on taskID - */ - public static String genDataMapStorePathOnTaskId(String tablePath, String segmentId, - String dataMapName, String taskName) { - return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName - + File.separator + taskName; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java index 1104d09..86fba32 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java @@ -87,7 +87,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { */ private Analyzer analyzer; - private String taskName; + private String filePath; LuceneFineGrainDataMap(Analyzer analyzer) { this.analyzer = analyzer; @@ -102,7 +102,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { LOGGER.info("Lucene index read path " + indexPath.toString()); - this.taskName = indexPath.getName(); + this.filePath = indexPath.getName(); // get file system , use hdfs file system , realized in solr project FileSystem fs = FileFactory.getFileSystem(indexPath); @@ -277,7 +277,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { } // add a FineGrainBlocklet - blocklets.add(new FineGrainBlocklet(taskName, blockletId, pages)); + blocklets.add(new FineGrainBlocklet(filePath, blockletId, pages)); } return blocklets; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java index 1dae9b5..ec9283d 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java @@ -22,13 +22,17 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; /** * CG level of lucene DataMap @@ -36,6 +40,11 @@ import org.apache.carbondata.core.memory.MemoryException; @InterfaceAudience.Internal public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<FineGrainDataMap> { + public LuceneFineGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + super(carbonTable, dataMapSchema); + } + /** * Get the datamap for segmentid */ @@ -44,7 +53,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer); try { dataMap.init(new DataMapModel( - LuceneDataMapWriter.genDataMapStorePath( + DataMapWriter.getDefaultDataMapPath( tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName))); } catch (MemoryException e) { LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage()); @@ -74,7 +83,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine } @Override - public DataMapLevel getDataMapType() { + public DataMapLevel getDataMapLevel() { return DataMapLevel.FG; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java deleted file mode 100644 index 2d5dcf8..0000000 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.lucene; - -import java.io.IOException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.CarbonProperties; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; -import org.apache.lucene.codecs.lucene62.Lucene62Codec; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.DoublePoint; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.FloatPoint; -import org.apache.lucene.document.IntPoint; -import org.apache.lucene.document.IntRangeField; -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.document.StoredField; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.RAMDirectory; -import org.apache.solr.store.hdfs.HdfsDirectory; - -public class LuceneIndexRefreshBuilder { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName()); - - private String strIndexPath; - - private String[] indexColumns; - private DataType[] dataTypes; - - private int columnsCount; - - private IndexWriter indexWriter = null; - - private IndexWriter pageIndexWriter = null; - - private Analyzer analyzer = null; - - public LuceneIndexRefreshBuilder(String strIndexPath, String[] indexColumns, - DataType[] dataTypes) { - this.strIndexPath = strIndexPath; - this.indexColumns = indexColumns; - this.columnsCount = indexColumns.length; - this.dataTypes = dataTypes; - } - - public void initialize() throws IOException { - // get index path, put index data into segment's path - Path indexPath = FileFactory.getPath(strIndexPath); - FileSystem fs = FileFactory.getFileSystem(indexPath); - - // if index path not exists, create it - if (!fs.exists(indexPath)) { - fs.mkdirs(indexPath); - } - - if (null == analyzer) { - analyzer = new StandardAnalyzer(); - } - - // create a index writer - Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); - - IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); - if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE, - CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT) - .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) { - indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED)); - } else { - indexWriterConfig - .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION)); - } - - indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); - } - - private IndexWriter createPageIndexWriter() throws IOException { - // save index data into ram, write into disk after one page finished - RAMDirectory ramDir = new RAMDirectory(); - IndexWriter ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); - - return ramIndexWriter; - } - - private void addPageIndex(IndexWriter pageIndexWriter) throws IOException { - - Directory directory = pageIndexWriter.getDirectory(); - - // close ram writer - pageIndexWriter.close(); - - // add ram index data into disk - indexWriter.addIndexes(new Directory[] { directory }); - - // delete this ram data - directory.close(); - } - - public void addDocument(Object[] values) throws IOException { - - if (values.length != indexColumns.length + 3) { - throw new IOException("The column number (" + values.length + ") of the row is incorrect."); - } - int rowId = (int) values[indexColumns.length + 2]; - if (rowId == 0) { - if (pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } - pageIndexWriter = createPageIndexWriter(); - } - - // create a new document - Document doc = new Document(); - - // add blocklet Id - doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, - new int[] { (int) values[columnsCount] })); - doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount])); - - // add page id - doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, - new int[] { (int) values[columnsCount + 1] })); - doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1])); - - // add row id - doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, new int[] { rowId })); - doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId)); - - // add other fields - for (int colIdx = 0; colIdx < columnsCount; colIdx++) { - addField(doc, indexColumns[colIdx], dataTypes[colIdx], values[colIdx]); - } - - pageIndexWriter.addDocument(doc); - } - - private boolean addField(Document doc, String fieldName, DataType type, Object value) { - if (type == DataTypes.BYTE) { - // byte type , use int range to deal with byte, lucene has no byte type - IntRangeField field = - new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE }); - field.setIntValue((int) value); - doc.add(field); - } else if (type == DataTypes.SHORT) { - // short type , use int range to deal with short type, lucene has no short type - IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE }, - new int[] { Short.MAX_VALUE }); - field.setShortValue((short) value); - doc.add(field); - } else if (type == DataTypes.INT) { - // int type , use int point to deal with int type - doc.add(new IntPoint(fieldName, new int[] { (int) value })); - } else if (type == DataTypes.LONG) { - // long type , use long point to deal with long type - doc.add(new LongPoint(fieldName, new long[] { (long) value })); - } else if (type == DataTypes.FLOAT) { - doc.add(new FloatPoint(fieldName, new float[] { (float) value })); - } else if (type == DataTypes.DOUBLE) { - doc.add(new DoublePoint(fieldName, new double[] { (double) value })); - } else if (type == DataTypes.STRING) { - doc.add(new TextField(fieldName, (String) value, Field.Store.NO)); - } else if (type == DataTypes.DATE) { - // TODO: how to get data value - } else if (type == DataTypes.TIMESTAMP) { - // TODO: how to get - } else if (type == DataTypes.BOOLEAN) { - IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); - field.setIntValue((boolean) value ? 1 : 0); - doc.add(field); - } else { - LOGGER.error("unsupport data type " + type); - throw new RuntimeException("unsupported data type " + type); - } - return true; - } - - public void finish() throws IOException { - if (indexWriter != null && pageIndexWriter != null) { - addPageIndex(pageIndexWriter); - } - } - - public void close() throws IOException { - if (indexWriter != null) { - indexWriter.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 7c0da5e..25cc228 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -491,8 +491,8 @@ m filterExpression if (blocklet.getSegmentId().equals(segment.getSegmentNo())) { found = true; // Set the pruned index file to the segment for further pruning. - String uniqueTaskName = CarbonTablePath.getUniqueTaskName(blocklet.getTaskName()); - segment.setFilteredIndexShardName(uniqueTaskName); + String shardName = CarbonTablePath.getShardName(blocklet.getFilePath()); + segment.setFilteredIndexShardName(shardName); } } // Add to remove segments list if not present in pruned blocklets. http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala index 540c8ca..d111594 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala @@ -45,7 +45,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country') + | DMProperties('INDEX_COLUMNS'='country') """.stripMargin) checkExistence(sql("show datamap on table datamap_main"), true, "lucene_datamap") sql("drop datamap if exists lucene_datamap on table datamap_main") @@ -64,12 +64,11 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='id') + | DMProperties('INDEX_COLUMNS'='id') """.stripMargin) } assert(exception_otherdataType.getMessage - .contains("TEXT_COLUMNS only supports String column. Unsupported column: id, " + - "DataType: INT")) + .contains("Only String column is supported, column 'id' is INT type.")) } //Create Lucene DataMap With DMProperties on MainTable and Load Data and Query @@ -84,7 +83,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country') + | DMProperties('INDEX_COLUMNS'='country') """.stripMargin) sql(s"LOAD DATA INPATH '$csvPath' INTO TABLE datamap_main") @@ -105,7 +104,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country,name,serialname') + | DMProperties('INDEX_COLUMNS'='country,name,serialname') """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main") checkAnswer(sql("SELECT * FROM datamap_main WHERE TEXT_MATCH('country:ch*')"), @@ -137,7 +136,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country,name') + | DMProperties('INDEX_COLUMNS'='country,name') """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main") sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main") @@ -163,7 +162,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country') + | DMProperties('INDEX_COLUMNS'='country') """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main OPTIONS('header'='false'," + s"'BAD_RECORDS_LOGGER_ENABLE'='FALSE','BAD_RECORDS_ACTION'='FORCE','SINGLE_PASS'='TRUE')") @@ -183,7 +182,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country,name') + | DMProperties('INDEX_COLUMNS'='country,name') """.stripMargin) sql("insert into datamap_main select 1,'abc','aa'") sql("insert into datamap_main select 2,'def','ab'") @@ -210,11 +209,11 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP lucene_datamap ON TABLE datamap_main | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='country') + | DMProperties('INDEX_COLUMNS'='country') """.stripMargin) } assert(exception_dicitionaryinclude.getMessage - .contains("TEXT_COLUMNS cannot contain dictionary column country")) + .contains("Dictionary column is not supported, column 'country' is dictionary column")) sql("drop datamap if exists lucene_datamap on table datamap_main") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala index 245d147..5e2534c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala @@ -55,7 +55,7 @@ class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { s""" | CREATE DATAMAP dm ON TABLE datamap_test | USING 'lucene' - | DMProperties('TEXT_COLUMNS'='name,city') + | DMProperties('INDEX_COLUMNS'='name,city') """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")