http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java index b196d0d..266c107 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java @@ -17,7 +17,6 @@ package org.apache.carbondata.datamap.examples; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -25,90 +24,94 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.dev.DataMap; -import org.apache.carbondata.core.datamap.dev.DataMapFactory; -import org.apache.carbondata.core.datamap.dev.DataMapWriter; -import org.apache.carbondata.core.events.ChangeEvent; -import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; - +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.events.Event; /** * Min Max DataMap Factory */ -public class MinMaxDataMapFactory implements DataMapFactory { +public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory { private AbsoluteTableIdentifier identifier; - @Override - public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { this.identifier = identifier; } /** * createWriter will return the MinMaxDataWriter. - * @param segmentId + * + * @param segment * @return */ - @Override - public DataMapWriter createWriter(String segmentId) { - return new MinMaxDataWriter(); + @Override public AbstractDataMapWriter createWriter(Segment segment) { + return new MinMaxDataWriter(identifier, segment.getSegmentNo(), + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())); } /** * getDataMaps Factory method Initializes the Min Max Data Map and returns. - * @param segmentId + * + * @param segment * @return * @throws IOException */ - @Override public List<DataMap> getDataMaps(String segmentId) throws IOException { - List<DataMap> dataMapList = new ArrayList<>(); + @Override public List<AbstractCoarseGrainDataMap> getDataMaps(Segment segment) + throws IOException { + List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>(); // Form a dataMap of Type MinMaxDataMap. MinMaxDataMap dataMap = new MinMaxDataMap(); try { - dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator); + dataMap.init(new DataMapModel( + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()))); } catch (MemoryException ex) { - + throw new IOException(ex); } dataMapList.add(dataMap); return dataMapList; } /** - * - * @param segmentId + * @param segment * @return */ - @Override public List<DataMapDistributable> toDistributable(String segmentId) { + @Override public List<DataMapDistributable> toDistributable(Segment segment) { return null; } /** * Clear the DataMap. - * @param segmentId + * + * @param segment */ - @Override public void clear(String segmentId) { + @Override public void clear(Segment segment) { } /** * Clearing the data map. */ - @Override - public void clear() { + @Override public void clear() { } - @Override public DataMap getDataMap(DataMapDistributable distributable) { + @Override public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) + throws IOException { return null; } - @Override - public void fireEvent(ChangeEvent event) { + @Override public void fireEvent(Event event) { } - @Override - public DataMapMeta getMeta() { - return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), FilterType.EQUALTO); + @Override public DataMapMeta getMeta() { + return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), + new ArrayList<ExpressionType>()); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index 78544d3..fe0bbcf 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -19,7 +19,6 @@ package org.apache.carbondata.datamap.examples; import java.io.BufferedWriter; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; @@ -29,17 +28,18 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; -public class MinMaxDataWriter implements DataMapWriter { +public class MinMaxDataWriter extends AbstractDataMapWriter { private static final LogService LOGGER = LogServiceFactory.getLogService(TableInfo.class.getName()); @@ -50,17 +50,23 @@ public class MinMaxDataWriter implements DataMapWriter { private Map<Integer, BlockletMinMax> blockMinMaxMap; - private String blockPath; + private String dataWritePath; + public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId, + String dataWritePath) { + super(identifier, segmentId, dataWritePath); + this.identifier = identifier; + this.segmentId = segmentId; + this.dataWritePath = dataWritePath; + } - @Override public void onBlockStart(String blockId, String blockPath) { + @Override public void onBlockStart(String blockId) { pageLevelMax = null; pageLevelMin = null; blockletLevelMax = null; blockletLevelMin = null; blockMinMaxMap = null; blockMinMaxMap = new HashMap<Integer, BlockletMinMax>(); - this.blockPath = blockPath; } @Override public void onBlockEnd(String blockId) { @@ -161,7 +167,7 @@ public class MinMaxDataWriter implements DataMapWriter { List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null; tempMinMaxIndexBlockDetails = loadBlockDetails(); try { - writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId); + writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockId); } catch (IOException ex) { LOGGER.info(" Unable to write the file"); } @@ -178,7 +184,6 @@ public class MinMaxDataWriter implements DataMapWriter { tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin()); tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax()); tmpminMaxIndexBlockDetails.setBlockletId(index); - tmpminMaxIndexBlockDetails.setFilePath(this.blockPath); minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails); } return minMaxIndexBlockDetails; @@ -187,22 +192,19 @@ public class MinMaxDataWriter implements DataMapWriter { /** * Write the data to a file. This is JSON format file. * @param minMaxIndexBlockDetails - * @param blockPath * @param blockId * @throws IOException */ public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails, - String blockPath, String blockId) throws IOException { - String filePath = blockPath.substring(0, blockPath.lastIndexOf(File.separator) + 1) + blockId - + ".minmaxindex"; + String blockId) throws IOException { + String filePath = dataWritePath +"/" + blockId + ".minmaxindex"; BufferedWriter brWriter = null; DataOutputStream dataOutStream = null; try { FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath)); dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath)); Gson gsonObjectToWrite = new Gson(); - brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, - CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT)); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, "UTF-8")); String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails); brWriter.write(minmaxIndexData); } catch (IOException ioe) { @@ -215,7 +217,11 @@ public class MinMaxDataWriter implements DataMapWriter { dataOutStream.flush(); } CarbonUtil.closeStreams(brWriter, dataOutStream); + commitFile(filePath); } } + @Override public void finish() throws IOException { + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java index 0596db5..93a453e 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java @@ -33,11 +33,6 @@ public class MinMaxIndexBlockDetails implements Serializable { private byte[][] maxValues; /** - * filePath pointing to the block. - */ - private String filePath; - - /** * BlockletID of the block. */ private Integer BlockletId; @@ -59,14 +54,6 @@ public class MinMaxIndexBlockDetails implements Serializable { this.maxValues = maxValues; } - public String getFilePath() { - return filePath; - } - - public void setFilePath(String filePath) { - this.filePath = filePath; - } - public Integer getBlockletId() { return BlockletId; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index a4c6e4a..c586f3c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -87,6 +87,8 @@ public class CarbonInputSplit extends FileSplit private FileFormat fileFormat = FileFormat.COLUMNAR_V3; + private String dataMapWritePath; + public CarbonInputSplit() { segmentId = null; taskId = "0"; @@ -98,7 +100,8 @@ public class CarbonInputSplit extends FileSplit } private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, - String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles) { + String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles, + String dataMapWritePath) { super(path, start, length, locations); this.segmentId = segmentId; String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); @@ -111,12 +114,13 @@ public class CarbonInputSplit extends FileSplit this.invalidSegments = new ArrayList<>(); this.version = version; this.deleteDeltaFiles = deleteDeltaFiles; + this.dataMapWritePath = dataMapWritePath; } public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) { - this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles); + this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles, null); this.numberOfBlocklets = numberOfBlocklets; } @@ -166,9 +170,9 @@ public class CarbonInputSplit extends FileSplit } public static CarbonInputSplit from(String segmentId, String blockletId, FileSplit split, - ColumnarFormatVersion version) throws IOException { + ColumnarFormatVersion version, String dataMapWritePath) throws IOException { return new CarbonInputSplit(segmentId, blockletId, split.getPath(), split.getStart(), - split.getLength(), split.getLocations(), version, null); + split.getLength(), split.getLocations(), version, null, dataMapWritePath); } public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) { @@ -182,6 +186,7 @@ public class CarbonInputSplit extends FileSplit split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos, split.getVersion(), split.getDeleteDeltaFiles()); blockInfo.setDetailInfo(split.getDetailInfo()); + blockInfo.setDataMapWriterPath(split.dataMapWritePath); blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset()); tableBlockInfoList.add(blockInfo); } catch (IOException e) { @@ -233,6 +238,10 @@ public class CarbonInputSplit extends FileSplit detailInfo = new BlockletDetailInfo(); detailInfo.readFields(in); } + boolean dataMapWriterPathExists = in.readBoolean(); + if (dataMapWriterPathExists) { + dataMapWritePath = in.readUTF(); + } } @Override public void write(DataOutput out) throws IOException { @@ -255,6 +264,10 @@ public class CarbonInputSplit extends FileSplit if (detailInfo != null) { detailInfo.write(out); } + out.writeBoolean(dataMapWritePath != null); + if (dataMapWritePath != null) { + out.writeUTF(dataMapWritePath); + } } public List<String> getInvalidSegments() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 9b86e4f..5cebc12 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -34,6 +34,7 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.DataMapType; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -738,16 +739,17 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // get tokens for all the required FileSystem for table path TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); - - TableDataMap blockletMap = DataMapStoreManager.getInstance() - .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, - BlockletDataMapFactory.class.getName()); + boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); + TableDataMap blockletMap = + DataMapStoreManager.getInstance().chooseDataMap(absoluteTableIdentifier); DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); List<ExtendedBlocklet> prunedBlocklets; - if (dataMapJob != null) { + if (distributedCG || blockletMap.getDataMapFactory().getDataMapType() == DataMapType.FG) { DistributableDataMapFormat datamapDstr = - new DistributableDataMapFormat(absoluteTableIdentifier, BlockletDataMap.NAME, + new DistributableDataMapFormat(absoluteTableIdentifier, blockletMap.getDataMapName(), segmentIds, partitionsToPrune, BlockletDataMapFactory.class.getName()); prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); @@ -800,7 +802,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(), blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(), blocklet.getLocations()), - ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber())); + ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), + blocklet.getDataMapWriterPath()); split.setDetailInfo(blocklet.getDetailInfo()); return split; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala new file mode 100644 index 0000000..4b6f231 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datastore.FileReader +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.compression.SnappyCompressor +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.Blocklet +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf +import org.apache.carbondata.core.util.ByteUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest + +class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { + var identifier: AbsoluteTableIdentifier = _ + var dataMapName: String = _ + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + override def init(identifier: AbsoluteTableIdentifier, + dataMapName: String): Unit = { + this.identifier = identifier + this.dataMapName = dataMapName + } + + /** + * Return a new write for this datamap + */ + override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { + new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + } + + /** + * Get the datamap for segmentid + */ + override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = { + val file = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map {f => + val dataMap: AbstractCoarseGrainDataMap = new CGDataMap() + dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap + }.toList.asJava + } + + + /** + * Get datamaps for distributable object. + */ + override def getDataMaps( + distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = { + val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] + val dataMap: AbstractCoarseGrainDataMap = new CGDataMap() + dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + Seq(dataMap).asJava + } + + /** + * + * @param event + */ + override def fireEvent(event: Event): Unit = { + ??? + } + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { + val file = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) + d + }.toList.asJava + } + + + /** + * Clears datamap of the segment + */ + override def clear(segmentId: String): Unit = { + + } + + /** + * Clear all datamaps from memory + */ + override def clear(): Unit = { + + } + + /** + * Return metadata of this datamap + */ + override def getMeta: DataMapMeta = { + new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava) + } +} + +class CGDataMap extends AbstractCoarseGrainDataMap { + + var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _ + var FileReader: FileReader = _ + var filePath: String = _ + val compressor = new SnappyCompressor + + /** + * It is called to load the data map to memory or to initialize it. + */ + override def init(dataMapModel: DataMapModel): Unit = { + this.filePath = dataMapModel.getFilePath + val size = FileFactory.getCarbonFile(filePath).getSize + FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) + val footerLen = FileReader.readInt(filePath, size-4) + val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen) + val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(in) + maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]] + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + override def prune( + filterExp: FilterResolverIntf, + segmentProperties: SegmentProperties, + partitions: java.util.List[String]): java.util.List[Blocklet] = { + val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() + val expression = filterExp.getFilterExpression + getEqualToExpression(expression, buffer) + val value = buffer.map { f => + f.getChildren.get(1).evaluate(null).getString + } + val meta = findMeta(value(0).getBytes) + meta.map { f=> + new Blocklet(f._1, f._2+"") + }.asJava + } + + + private def findMeta(value: Array[Byte]) = { + val tuples = maxMin.filter { f => + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0 + } + tuples + } + + private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) + } + } + } + + /** + * Clear complete index table and release memory. + */ + override def clear() = { + ??? + } + + override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? +} + +class CGDataMapWriter(identifier: AbsoluteTableIdentifier, + segmentId: String, + dataWritePath: String, + dataMapName: String) + extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + + var currentBlockId: String = null + val cgwritepath = dataWritePath + "/" + + dataMapName + System.nanoTime() + ".datamap" + lazy val stream: DataOutputStream = FileFactory + .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) + val blockletList = new ArrayBuffer[Array[Byte]]() + val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]() + val compressor = new SnappyCompressor + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String): Unit = { + currentBlockId = blockId + } + + /** + * End of block notification + */ + override def onBlockEnd(blockId: String): Unit = { + + } + + /** + * Start of new blocklet notification. + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletStart(blockletId: Int): Unit = { + + } + + /** + * End of blocklet notification + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletEnd(blockletId: Int): Unit = { + val sorted = blockletList + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) + maxMin += + ((currentBlockId+"", blockletId, (sorted.last, sorted.head))) + blockletList.clear() + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in DataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + override def onPageAdded(blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + val size = pages(0).getPageSize + val list = new ArrayBuffer[Array[Byte]]() + var i = 0 + while (i < size) { + val bytes = pages(0).getBytes(i) + val newBytes = new Array[Byte](bytes.length - 2) + System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) + list += newBytes + i = i + 1 + } + // Sort based on the column data in order to create index. + val sorted = list + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) + blockletList += sorted.head + blockletList += sorted.last + } + + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish(): Unit = { + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(maxMin) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + stream.writeInt(bytes.length) + stream.close() + commitFile(cgwritepath) + } + + +} + +class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/compaction/fil2.csv" + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test cg datamap") { + sql("DROP TABLE IF EXISTS datamap_test_cg") + sql( + """ + | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg") + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + table.getAbsoluteTableIdentifier, + classOf[CGDataMapFactory].getName, "cgdatamap") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test_cg where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test_cg") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index c80ee2b..2f8a1d1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -20,25 +20,32 @@ package org.apache.carbondata.spark.testsuite.datamap import java.util import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.schema.FilterType import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event -class C2DataMapFactory() extends DataMapFactory { +class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory { + + var identifier: AbsoluteTableIdentifier = _ override def init(identifier: AbsoluteTableIdentifier, - dataMapName: String): Unit = {} + dataMapName: String): Unit = { + this.identifier = identifier + } override def fireEvent(event: Event): Unit = ??? @@ -46,13 +53,13 @@ class C2DataMapFactory() extends DataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? override def getDataMaps(segmentId: Segment): util.List[DataMap] = ??? - override def createWriter(segmentId: Segment): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock + override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock - override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO) + override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) /** * Get all distributable objects of a segmentid @@ -62,6 +69,7 @@ class C2DataMapFactory() extends DataMapFactory { override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = { ??? } + } class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { @@ -164,9 +172,12 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { } object DataMapWriterSuite { + var callbackSeq: Seq[String] = Seq[String]() - val dataMapWriterC2Mock = new DataMapWriter { + def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, + dataWritePath: String) = + new AbstractDataMapWriter(identifier, segmentId, dataWritePath) { override def onPageAdded( blockletId: Int, @@ -191,9 +202,21 @@ object DataMapWriterSuite { callbackSeq :+= s"blocklet start $blockletId" } - override def onBlockStart(blockId: String, blockPath: String): Unit = { + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String) = { callbackSeq :+= s"block start $blockId" } + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish() = { + + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala new file mode 100644 index 0000000..d1bb65f --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory} +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datastore.FileReader +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.compression.SnappyCompressor +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.FineGrainBlocklet +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf +import org.apache.carbondata.core.util.ByteUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest + +class FGDataMapFactory extends AbstractFineGrainDataMapFactory { + var identifier: AbsoluteTableIdentifier = _ + var dataMapName: String = _ + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + override def init(identifier: AbsoluteTableIdentifier, + dataMapName: String): Unit = { + this.identifier = identifier + this.dataMapName = dataMapName + } + + /** + * Return a new write for this datamap + */ + override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { + new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + } + + /** + * Get the datamap for segmentid + */ + override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = { + val file = FileFactory + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val dataMap: AbstractFineGrainDataMap = new FGDataMap() + dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap + }.toList.asJava + } + + /** + * Get datamap for distributable object. + */ + override def getDataMaps( + distributable: DataMapDistributable): java.util.List[AbstractFineGrainDataMap]= { + val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] + val dataMap: AbstractFineGrainDataMap = new FGDataMap() + dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + Seq(dataMap).asJava + } + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { + val file = FileFactory + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) + d + }.toList.asJava + } + + + /** + * + * @param event + */ + override def fireEvent(event: Event):Unit = { + ??? + } + + /** + * Clears datamap of the segment + */ + override def clear(segmentId: String): Unit = { + } + + /** + * Clear all datamaps from memory + */ + override def clear(): Unit = { + } + + /** + * Return metadata of this datamap + */ + override def getMeta: DataMapMeta = { + new DataMapMeta(Seq("name").toList.asJava, new ArrayBuffer[ExpressionType]().toList.asJava) + } +} + +class FGDataMap extends AbstractFineGrainDataMap { + + var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _ + var FileReader: FileReader = _ + var filePath: String = _ + val compressor = new SnappyCompressor + + /** + * It is called to load the data map to memory or to initialize it. + */ + override def init(dataMapModel: DataMapModel): Unit = { + this.filePath = dataMapModel.getFilePath + val size = FileFactory.getCarbonFile(filePath).getSize + FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) + val footerLen = FileReader.readInt(filePath, size - 4) + val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen) + val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(in) + maxMin = obj.readObject() + .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]] + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + override def prune( + filterExp: FilterResolverIntf, + segmentProperties: SegmentProperties, + partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { + val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() + val expression = filterExp.getFilterExpression + getEqualToExpression(expression, buffer) + val value = buffer.map { f => + f.getChildren.get(1).evaluate(null).getString + } + val meta = findMeta(value(0).getBytes) + meta.map { f => + readAndFindData(f, value(0).getBytes()) + }.filter(_.isDefined).map(_.get).asJava + } + + private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), + value: Array[Byte]): Option[FineGrainBlocklet] = { + val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) + val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(outputStream) + val blockletsData = obj.readObject() + .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]] + + import scala.collection.Searching._ + val searching = blockletsData + .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[ + (Array[Byte], Seq[Seq[Int]], Seq[Int])] { + override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]), + y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = { + ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1) + } + }) + if (searching.insertionPoint >= 0) { + val f = blockletsData(searching.insertionPoint) + val pages = f._3.zipWithIndex.map { p => + val pg = new FineGrainBlocklet.Page + pg.setPageId(p._1) + pg.setRowId(f._2(p._2).toArray) + pg + } + pages + Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) + } else { + None + } + + } + + private def findMeta(value: Array[Byte]) = { + val tuples = maxMin.filter { f => + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0 + } + tuples + } + + def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) + } + } + } + + /** + * Clear complete index table and release memory. + */ + override def clear():Unit = { + ??? + } + + override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? +} + +class FGDataMapWriter(identifier: AbsoluteTableIdentifier, + segmentId: String, dataWriterPath: String, dataMapName: String) + extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { + + var currentBlockId: String = null + val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap" + val stream: DataOutputStream = FileFactory + .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) + val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() + val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]() + var position: Long = 0 + val compressor = new SnappyCompressor + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String): Unit = { + currentBlockId = blockId + } + + /** + * End of block notification + */ + override def onBlockEnd(blockId: String): Unit = { + + } + + /** + * Start of new blocklet notification. + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletStart(blockletId: Int): Unit = { + + } + + /** + * End of blocklet notification + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletEnd(blockletId: Int): Unit = { + val sorted = blockletList + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) + var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null + var addedLast: Boolean = false + val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]() + // Merge all same column values to single row. + sorted.foreach { f => + if (oldValue != null) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { + oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3) + addedLast = false + } else { + blockletListUpdated += oldValue + oldValue = (f._1, Seq(f._2), f._3) + addedLast = true + } + } else { + oldValue = (f._1, Seq(f._2), f._3) + addedLast = false + } + } + if (!addedLast && oldValue != null) { + blockletListUpdated += oldValue + } + + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(blockletListUpdated) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + maxMin += + ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last + ._1), position, bytes.length)) + position += bytes.length + blockletList.clear() + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in DataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + override def onPageAdded(blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + val size = pages(0).getPageSize + val list = new ArrayBuffer[(Array[Byte], Int)]() + var i = 0 + while (i < size) { + val bytes = pages(0).getBytes(i) + val newBytes = new Array[Byte](bytes.length - 2) + System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) + list += ((newBytes, i)) + i = i + 1 + } + // Sort based on the column data in order to create index. + val sorted = list + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) + var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null + var addedLast: Boolean = false + // Merge all same column values to single row. + sorted.foreach { f => + if (oldValue != null) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { + oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3) + addedLast = false + } else { + blockletList += oldValue + oldValue = (f._1, Seq(f._2), Seq(pageId)) + addedLast = true + } + } else { + oldValue = (f._1, Seq(f._2), Seq(pageId)) + addedLast = false + } + } + if (!addedLast && oldValue != null) { + blockletList += oldValue + } + } + + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish(): Unit = { + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(maxMin) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + stream.writeInt(bytes.length) + stream.close() + commitFile(fgwritepath) + } + + +} + +class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/compaction/fil2.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test fg datamap") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + table.getAbsoluteTableIdentifier, + classOf[FGDataMapFactory].getName, "fgdatamap") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index f2cdd67..5550358 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -28,11 +28,14 @@ import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} +import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.schema.FilterType import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event import org.apache.carbondata.spark.exception.ConcurrentOperationException @@ -264,7 +267,7 @@ object Global { var overwriteRunning = false } -class WaitingDataMap() extends DataMapFactory { +class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { } @@ -274,12 +277,12 @@ class WaitingDataMap() extends DataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? override def getDataMaps(segmentId: Segment): util.List[DataMap] = ??? - override def createWriter(segmentId: Segment): DataMapWriter = { - new DataMapWriter { + override def createWriter(segmentId: Segment): AbstractDataMapWriter = { + new AbstractDataMapWriter { override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { } override def onBlockletEnd(blockletId: Int): Unit = { } @@ -295,10 +298,14 @@ class WaitingDataMap() extends DataMapFactory { // wait for 1 second to let second SQL to finish Thread.sleep(1000) } + + override def finish(): Unit = { + + } } } - override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO) + override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, Seq(ExpressionType.EQUALS).asJava) override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ??? } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 97be1fb..ddb9b32 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -448,9 +448,10 @@ class CarbonScanRDD( CarbonTableInputFormat.setQuerySegment(conf, identifier) CarbonTableInputFormat.setFilterPredicates(conf, filterExpression) CarbonTableInputFormat.setColumnProjection(conf, columnProjection) - if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { + CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) + if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index dfffc8e..7dc6275 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -1227,8 +1227,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { .start() qry.awaitTermination() } catch { - case ex => - throw new Exception(ex.getMessage) + case ex: Throwable => + LOGGER.error(ex.getMessage) + throw new Exception(ex.getMessage, ex) } finally { if (null != qry) { qry.stop() http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 6fbbd3e..5083ab5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -17,6 +17,7 @@ package org.apache.carbondata.processing.datamap; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,8 +30,8 @@ import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; import org.apache.carbondata.core.datamap.dev.DataMapFactory; -import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.processing.store.TablePage; @@ -44,25 +45,26 @@ public class DataMapWriterListener { DataMapWriterListener.class.getCanonicalName()); // list indexed column name -> list of data map writer - private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>(); + private Map<List<String>, List<AbstractDataMapWriter>> registry = new ConcurrentHashMap<>(); /** * register all datamap writer for specified table and segment */ - public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) { + public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId, + String dataWritePath) { List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier); if (tableDataMaps != null) { for (TableDataMap tableDataMap : tableDataMaps) { DataMapFactory factory = tableDataMap.getDataMapFactory(); - register(factory, segmentId); + register(factory, segmentId, dataWritePath); } } } /** - * Register a DataMapWriter + * Register a AbstractDataMapWriter */ - private void register(DataMapFactory factory, String segmentId) { + private void register(DataMapFactory factory, String segmentId, String dataWritePath) { assert (factory != null); assert (segmentId != null); DataMapMeta meta = factory.getMeta(); @@ -71,8 +73,8 @@ public class DataMapWriterListener { return; } List<String> columns = factory.getMeta().getIndexedColumns(); - List<DataMapWriter> writers = registry.get(columns); - DataMapWriter writer = factory.createWriter(new Segment(segmentId, null)); + List<AbstractDataMapWriter> writers = registry.get(columns); + AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null)); if (writers != null) { writers.add(writer); } else { @@ -80,36 +82,36 @@ public class DataMapWriterListener { writers.add(writer); registry.put(columns, writers); } - LOG.info("DataMapWriter " + writer + " added"); + LOG.info("AbstractDataMapWriter " + writer + " added"); } public void onBlockStart(String blockId, String blockPath) { - for (List<DataMapWriter> writers : registry.values()) { - for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId, blockPath); + for (List<AbstractDataMapWriter> writers : registry.values()) { + for (AbstractDataMapWriter writer : writers) { + writer.onBlockStart(blockId); } } } public void onBlockEnd(String blockId) { - for (List<DataMapWriter> writers : registry.values()) { - for (DataMapWriter writer : writers) { + for (List<AbstractDataMapWriter> writers : registry.values()) { + for (AbstractDataMapWriter writer : writers) { writer.onBlockEnd(blockId); } } } public void onBlockletStart(int blockletId) { - for (List<DataMapWriter> writers : registry.values()) { - for (DataMapWriter writer : writers) { + for (List<AbstractDataMapWriter> writers : registry.values()) { + for (AbstractDataMapWriter writer : writers) { writer.onBlockletStart(blockletId); } } } public void onBlockletEnd(int blockletId) { - for (List<DataMapWriter> writers : registry.values()) { - for (DataMapWriter writer : writers) { + for (List<AbstractDataMapWriter> writers : registry.values()) { + for (AbstractDataMapWriter writer : writers) { writer.onBlockletEnd(blockletId); } } @@ -122,18 +124,29 @@ public class DataMapWriterListener { * @param tablePage page data */ public void onPageAdded(int blockletId, int pageId, TablePage tablePage) { - Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet(); - for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) { + Set<Map.Entry<List<String>, List<AbstractDataMapWriter>>> entries = registry.entrySet(); + for (Map.Entry<List<String>, List<AbstractDataMapWriter>> entry : entries) { List<String> indexedColumns = entry.getKey(); ColumnPage[] pages = new ColumnPage[indexedColumns.size()]; for (int i = 0; i < indexedColumns.size(); i++) { pages[i] = tablePage.getColumnPage(indexedColumns.get(i)); } - List<DataMapWriter> writers = entry.getValue(); - for (DataMapWriter writer : writers) { + List<AbstractDataMapWriter> writers = entry.getValue(); + for (AbstractDataMapWriter writer : writers) { writer.onPageAdded(blockletId, pageId, pages); } } } + /** + * Finish all datamap writers + */ + public void finish() throws IOException { + for (List<AbstractDataMapWriter> writers : registry.values()) { + for (AbstractDataMapWriter writer : writers) { + writer.finish(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 5062a78..d6af747 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.block.SegmentProperties; @@ -259,7 +260,8 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration); DataMapWriterListener listener = new DataMapWriterListener(); - listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId()); + listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId(), + storeLocation[new Random().nextInt(storeLocation.length)]); carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); @@ -322,6 +324,12 @@ public class CarbonFactDataHandlerModel { segmentProperties.getDimensions(), segmentProperties.getMeasures()); + DataMapWriterListener listener = new DataMapWriterListener(); + listener.registerAllWriter( + loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(), + loadModel.getSegmentId(), + tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]); + carbonFactDataHandlerModel.dataMapWriterlistener = listener; return carbonFactDataHandlerModel; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb5bb00a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 02391cf..8d26ad2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -17,8 +17,6 @@ package org.apache.carbondata.processing.store.writer; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; @@ -41,14 +39,11 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.index.BlockIndexInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonMergerUtil; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; @@ -63,7 +58,6 @@ import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; import org.apache.commons.lang3.ArrayUtils; -import org.apache.hadoop.io.IOUtils; public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { @@ -71,12 +65,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName()); /** - * dfs.bytes-per-checksum - * HDFS checksum length, block size for a file should be exactly divisible - * by this value - */ - private static final int HDFS_CHECKSUM_LENGTH = 512; - /** * file channel */ protected FileChannel fileChannel; @@ -208,35 +196,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { } /** - * This method will return max of block size and file size - * - * @param blockSize - * @param fileSize - * @return - */ - private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) { - long maxSize = blockSize; - if (fileSize > blockSize) { - maxSize = fileSize; - } - // block size should be exactly divisible by 512 which is maintained by HDFS as bytes - // per checksum, dfs.bytes-per-checksum=512 must divide block size - long remainder = maxSize % HDFS_CHECKSUM_LENGTH; - if (remainder > 0) { - maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder; - } - // convert to make block size more readable. - String readableBlockSize = ByteUtil.convertByteToReadable(blockSize); - String readableFileSize = ByteUtil.convertByteToReadable(fileSize); - String readableMaxSize = ByteUtil.convertByteToReadable(maxSize); - LOGGER.info( - "The configured block size is " + readableBlockSize + ", the actual carbon file size is " - + readableFileSize + ", choose the max value " + readableMaxSize - + " as the block size on HDFS"); - return maxSize; - } - - /** * This method will be used to update the file channel with new file if exceeding block size * threshold, new file will be created once existing file reached the file size limit This * method will first check whether existing file size is exceeded the file @@ -282,7 +241,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { private String constructFactFileFullPath() { String factFilePath = - this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName; + this.model.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName; return factFilePath; } /** @@ -293,7 +252,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { notifyDataMapBlockEnd(); CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); if (copyInCurrentThread) { - copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath); + CarbonUtil.copyCarbonDataFileToCarbonStorePath( + carbonDataFileTempPath, model.getCarbonDataDirectoryPath(), + fileSizeInBytes); } else { executorServiceSubmitList.add(executorService.submit(new CopyThread(carbonDataFileTempPath))); } @@ -446,7 +407,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { } writer.close(); // copy from temp to actual store location - copyCarbonDataFileToCarbonStorePath(fileName); + CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, + model.getCarbonDataDirectoryPath(), + fileSizeInBytes); } /** @@ -456,80 +419,20 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { * @throws CarbonDataWriterException */ protected void closeExecutorService() throws CarbonDataWriterException { - executorService.shutdown(); try { + listener.finish(); + executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.HOURS); - } catch (InterruptedException e) { - throw new CarbonDataWriterException(e.getMessage()); - } - for (int i = 0; i < executorServiceSubmitList.size(); i++) { - try { + for (int i = 0; i < executorServiceSubmitList.size(); i++) { executorServiceSubmitList.get(i).get(); - } catch (InterruptedException e) { - throw new CarbonDataWriterException(e.getMessage()); - } catch (ExecutionException e) { - throw new CarbonDataWriterException(e.getMessage()); } + } catch (InterruptedException | ExecutionException | IOException e) { + LOGGER.error(e, "Error while finishing writer"); + throw new CarbonDataWriterException(e.getMessage()); } } - /** - * This method will copy the given file to carbon store location - * - * @param localFileName local file name with full path - * @throws CarbonDataWriterException - */ - protected void copyCarbonDataFileToCarbonStorePath(String localFileName) - throws CarbonDataWriterException { - long copyStartTime = System.currentTimeMillis(); - LOGGER.info("Copying " + localFileName + " --> " + model.getCarbonDataDirectoryPath()); - try { - CarbonFile localCarbonFile = - FileFactory.getCarbonFile(localFileName, FileFactory.getFileType(localFileName)); - String carbonFilePath = model.getCarbonDataDirectoryPath() + localFileName - .substring(localFileName.lastIndexOf(File.separator)); - copyLocalFileToCarbonStore(carbonFilePath, localFileName, - CarbonCommonConstants.BYTEBUFFER_SIZE, - getMaxOfBlockAndFileSize(fileSizeInBytes, localCarbonFile.getSize())); - } catch (IOException e) { - throw new CarbonDataWriterException( - "Problem while copying file from local store to carbon store", e); - } - LOGGER.info( - "Total copy time (ms) to copy file " + localFileName + " is " + (System.currentTimeMillis() - - copyStartTime)); - } - - /** - * This method will read the local carbon data file and write to carbon data file in HDFS - * - * @param carbonStoreFilePath - * @param localFilePath - * @param bufferSize - * @param blockSize - * @throws IOException - */ - private void copyLocalFileToCarbonStore(String carbonStoreFilePath, String localFilePath, - int bufferSize, long blockSize) throws IOException { - DataOutputStream dataOutputStream = null; - DataInputStream dataInputStream = null; - try { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("HDFS file block size for file: " + carbonStoreFilePath + " is " + blockSize - + " (bytes"); - } - dataOutputStream = FileFactory - .getDataOutputStream(carbonStoreFilePath, FileFactory.getFileType(carbonStoreFilePath), - bufferSize, blockSize); - dataInputStream = FileFactory - .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize); - IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize); - } finally { - CarbonUtil.closeStream(dataInputStream); - CarbonUtil.closeStream(dataOutputStream); - } - } /** * This method will copy the carbon data file from local store location to @@ -554,7 +457,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { * @throws Exception if unable to compute a result */ @Override public Void call() throws Exception { - copyCarbonDataFileToCarbonStorePath(fileName); + CarbonUtil.copyCarbonDataFileToCarbonStorePath( + fileName, + model.getCarbonDataDirectoryPath(), + fileSizeInBytes); return null; }