http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 04902f9..98c3398 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -42,7 +42,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -732,7 +732,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { DistributableDataMapFormat datamapDstr = new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds, partitionsToPrune, - BlockletDataMapFactory.class.getName()); + BlockletIndexDataMapFactory.class.getName()); prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); // Apply expression on the blocklets. prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 0d33797..d84c0e7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -265,7 +265,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { | GROUP BY dob,name """.stripMargin) } - assert(e.getMessage.contains(s"$timeSeries keyword missing")) + assert(e.getMessage.contains("Only 'path' dmproperty is allowed for this datamap")) sql("DROP TABLE IF EXISTS maintabletime") } @@ -282,7 +282,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { | GROUP BY column3,column5,column2 """.stripMargin) } - assert(e.getMessage.contains("DataMap class 'abc' not found")) + assert(e.getMessage.contains("DataMap 'abc' not found")) sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index f9ac354..a973bfd 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -293,8 +293,8 @@ test("test PreAggregate table selection with timeseries and normal together") { | GROUP BY dob,name """.stripMargin) - val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')") - preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year") + val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')") + preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year") sql("DROP TABLE IF EXISTS maintabletime") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index 49cabea..0868eed 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -200,7 +200,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { | GROUP BY dataTime """.stripMargin) } - assert(e.getMessage.equals("DataMap class 'abc' not found")) + assert(e.getMessage.equals("DataMap 'abc' not found")) } test("test timeseries create table: USING and catch MalformedCarbonCommandException") { @@ -215,7 +215,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { | GROUP BY dataTime """.stripMargin) } - assert(e.getMessage.equals("DataMap class 'abc' not found")) + assert(e.getMessage.equals("DataMap 'abc' not found")) } test("test timeseries create table: Only one granularity level can be defined 1") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala deleted file mode 100644 index d4c49d2..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.testsuite.datamap - -import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} -import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} -import org.apache.carbondata.core.datastore.FileReader -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.compression.SnappyCompressor -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.Blocklet -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf -import org.apache.carbondata.core.util.ByteUtil -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest - -class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { - var identifier: AbsoluteTableIdentifier = _ - var dataMapSchema: DataMapSchema = _ - - /** - * Initialization of Datamap factory with the identifier and datamap name - */ - override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier - this.dataMapSchema = dataMapSchema - } - - /** - * Return a new write for this datamap - */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) - } - - /** - * Get the datamap for segmentid - */ - override def getDataMaps(segmentId: String) = { - val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map {f => - val dataMap: AbstractCoarseGrainDataMap = new CGDataMap() - dataMap.init(new DataMapModel(f.getCanonicalPath)) - dataMap - }.toList.asJava - } - - - /** - * Get datamaps for distributable object. - */ - override def getDataMaps( - distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = { - val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] - val dataMap: AbstractCoarseGrainDataMap = new CGDataMap() - dataMap.init(new DataMapModel(mapDistributable.getFilePath)) - Seq(dataMap).asJava - } - - /** - * - * @param event - */ - override def fireEvent(event: Event): Unit = { - ??? - } - - /** - * Get all distributable objects of a segmentid - * - * @return - */ - override def toDistributable(segmentId: String) = { - val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map { f => - val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) - d - }.toList.asJava - } - - - /** - * Clears datamap of the segment - */ - override def clear(segmentId: String): Unit = { - - } - - /** - * Clear all datamaps from memory - */ - override def clear(): Unit = { - - } - - /** - * Return metadata of this datamap - */ - override def getMeta: DataMapMeta = { - new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, - List(ExpressionType.EQUALS, ExpressionType.IN).asJava) - } -} - -class CGDataMap extends AbstractCoarseGrainDataMap { - - var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _ - var FileReader: FileReader = _ - var filePath: String = _ - val compressor = new SnappyCompressor - - /** - * It is called to load the data map to memory or to initialize it. - */ - override def init(dataMapModel: DataMapModel): Unit = { - this.filePath = dataMapModel.getFilePath - val size = FileFactory.getCarbonFile(filePath).getSize - FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) - val footerLen = FileReader.readInt(filePath, size-4) - val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen) - val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) - val obj = new ObjectInputStream(in) - maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]] - } - - /** - * Prune the datamap with filter expression. It returns the list of - * blocklets where these filters can exist. - * - * @param filterExp - * @return - */ - override def prune( - filterExp: FilterResolverIntf, - segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[Blocklet] = { - val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() - val expression = filterExp.getFilterExpression - getEqualToExpression(expression, buffer) - val value = buffer.map { f => - f.getChildren.get(1).evaluate(null).getString - } - val meta = findMeta(value(0).getBytes) - meta.map { f=> - new Blocklet(f._1, f._2+"") - }.asJava - } - - - private def findMeta(value: Array[Byte]) = { - val tuples = maxMin.filter { f => - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 && - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0 - } - tuples - } - - private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { - if (expression.isInstanceOf[EqualToExpression]) { - buffer += expression - } else { - if (expression.getChildren != null) { - expression.getChildren.asScala.map { f => - if (f.isInstanceOf[EqualToExpression]) { - buffer += f - } - getEqualToExpression(f, buffer) - } - } - } - } - - /** - * Clear complete index table and release memory. - */ - override def clear() = { - ??? - } - - override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? -} - -class CGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, - dataWritePath: String, - dataMapSchema: DataMapSchema) - extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { - - var currentBlockId: String = null - val cgwritepath = dataWritePath + "/" + - dataMapSchema.getDataMapName + System.nanoTime() + ".datamap" - lazy val stream: DataOutputStream = FileFactory - .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) - val blockletList = new ArrayBuffer[Array[Byte]]() - val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]() - val compressor = new SnappyCompressor - - /** - * Start of new block notification. - * - * @param blockId file name of the carbondata file - */ - override def onBlockStart(blockId: String): Unit = { - currentBlockId = blockId - } - - /** - * End of block notification - */ - override def onBlockEnd(blockId: String): Unit = { - - } - - /** - * Start of new blocklet notification. - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletStart(blockletId: Int): Unit = { - - } - - /** - * End of blocklet notification - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletEnd(blockletId: Int): Unit = { - val sorted = blockletList - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) - maxMin += - ((currentBlockId+"", blockletId, (sorted.last, sorted.head))) - blockletList.clear() - } - - /** - * Add the column pages row to the datamap, order of pages is same as `indexColumns` in - * DataMapMeta returned in DataMapFactory. - * - * Implementation should copy the content of `pages` as needed, because `pages` memory - * may be freed after this method returns, if using unsafe column page. - */ - override def onPageAdded(blockletId: Int, - pageId: Int, - pages: Array[ColumnPage]): Unit = { - val size = pages(0).getPageSize - val list = new ArrayBuffer[Array[Byte]]() - var i = 0 - while (i < size) { - val bytes = pages(0).getBytes(i) - val newBytes = new Array[Byte](bytes.length - 2) - System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) - list += newBytes - i = i + 1 - } - // Sort based on the column data in order to create index. - val sorted = list - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) - blockletList += sorted.head - blockletList += sorted.last - } - - - /** - * This is called during closing of writer.So after this call no more data will be sent to this - * class. - */ - override def finish(): Unit = { - val out = new ByteOutputStream() - val outStream = new ObjectOutputStream(out) - outStream.writeObject(maxMin) - outStream.close() - val bytes = compressor.compressByte(out.getBytes) - stream.write(bytes) - stream.writeInt(bytes.length) - stream.close() - commitFile(cgwritepath) - } - - -} - -class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/compaction/fil2.csv" - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 150000 - CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) - sql("DROP TABLE IF EXISTS normal_test") - sql( - """ - | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") - } - - test("test cg datamap") { - sql("DROP TABLE IF EXISTS datamap_test_cg") - sql( - """ - | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg") - // register datamap writer - sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test_cg where name='n502670'"), - sql("select * from normal_test where name='n502670'")) - } - - test("test cg datamap with 2 datamaps ") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") - // register datamap writer - sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") - sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), - sql("select * from normal_test where name='n502670' and city='c2670'")) - } - - override protected def afterAll(): Unit = { - CompactionSupportGlobalSortBigFileTest.deleteFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql("DROP TABLE IF EXISTS datamap_test_cg") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala new file mode 100644 index 0000000..5e944fb --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory} +import org.apache.carbondata.core.datastore.FileReader +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.compression.SnappyCompressor +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.Blocklet +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf +import org.apache.carbondata.core.util.ByteUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest + +class CGIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory { + var identifier: AbsoluteTableIdentifier = _ + var dataMapSchema: DataMapSchema = _ + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + this.dataMapSchema = dataMapSchema + } + + /** + * Return a new write for this datamap + */ + override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { + new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) + } + + /** + * Get the datamap for segmentid + */ + override def getDataMaps(segmentId: String) = { + val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map {f => + val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap() + dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap + }.toList.asJava + } + + + /** + * Get datamaps for distributable object. + */ + override def getDataMaps( + distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = { + val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] + val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap() + dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + Seq(dataMap).asJava + } + + /** + * + * @param event + */ + override def fireEvent(event: Event): Unit = { + ??? + } + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String) = { + val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) + d + }.toList.asJava + } + + + /** + * Clears datamap of the segment + */ + override def clear(segmentId: String): Unit = { + + } + + /** + * Clear all datamaps from memory + */ + override def clear(): Unit = { + + } + + /** + * Return metadata of this datamap + */ + override def getMeta: DataMapMeta = { + new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, + List(ExpressionType.EQUALS, ExpressionType.IN).asJava) + } +} + +class CGIndexDataMap extends AbstractCoarseGrainIndexDataMap { + + var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _ + var FileReader: FileReader = _ + var filePath: String = _ + val compressor = new SnappyCompressor + + /** + * It is called to load the data map to memory or to initialize it. + */ + override def init(dataMapModel: DataMapModel): Unit = { + this.filePath = dataMapModel.getFilePath + val size = FileFactory.getCarbonFile(filePath).getSize + FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) + val footerLen = FileReader.readInt(filePath, size-4) + val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen) + val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(in) + maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]] + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + override def prune( + filterExp: FilterResolverIntf, + segmentProperties: SegmentProperties, + partitions: java.util.List[String]): java.util.List[Blocklet] = { + val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() + val expression = filterExp.getFilterExpression + getEqualToExpression(expression, buffer) + val value = buffer.map { f => + f.getChildren.get(1).evaluate(null).getString + } + val meta = findMeta(value(0).getBytes) + meta.map { f=> + new Blocklet(f._1, f._2+"") + }.asJava + } + + + private def findMeta(value: Array[Byte]) = { + val tuples = maxMin.filter { f => + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0 + } + tuples + } + + private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { + if (expression.isInstanceOf[EqualToExpression]) { + buffer += expression + } else { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) + } + } + } + } + + /** + * Clear complete index table and release memory. + */ + override def clear() = { + ??? + } + + override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? +} + +class CGDataMapWriter(identifier: AbsoluteTableIdentifier, + segmentId: String, + dataWritePath: String, + dataMapSchema: DataMapSchema) + extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + + var currentBlockId: String = null + val cgwritepath = dataWritePath + "/" + + dataMapSchema.getDataMapName + System.nanoTime() + ".datamap" + lazy val stream: DataOutputStream = FileFactory + .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) + val blockletList = new ArrayBuffer[Array[Byte]]() + val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]() + val compressor = new SnappyCompressor + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String): Unit = { + currentBlockId = blockId + } + + /** + * End of block notification + */ + override def onBlockEnd(blockId: String): Unit = { + + } + + /** + * Start of new blocklet notification. + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletStart(blockletId: Int): Unit = { + + } + + /** + * End of blocklet notification + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletEnd(blockletId: Int): Unit = { + val sorted = blockletList + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) + maxMin += + ((currentBlockId+"", blockletId, (sorted.last, sorted.head))) + blockletList.clear() + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in IndexDataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + override def onPageAdded(blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + val size = pages(0).getPageSize + val list = new ArrayBuffer[Array[Byte]]() + var i = 0 + while (i < size) { + val bytes = pages(0).getBytes(i) + val newBytes = new Array[Byte](bytes.length - 2) + System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) + list += newBytes + i = i + 1 + } + // Sort based on the column data in order to create index. + val sorted = list + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) + blockletList += sorted.head + blockletList += sorted.last + } + + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish(): Unit = { + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(maxMin) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + stream.writeInt(bytes.length) + stream.close() + commitFile(cgwritepath) + } + + +} + +class CGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/compaction/fil2.csv" + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test cg datamap") { + sql("DROP TABLE IF EXISTS datamap_test_cg") + sql( + """ + | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg") + // register datamap writer + sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test_cg where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + test("test cg datamap with 2 datamaps ") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") + sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), + sql("select * from normal_test where name='n502670' and city='c2670'")) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test_cg") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala deleted file mode 100644 index 903610a..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.datamap - -import java.util - -import scala.collection.JavaConverters._ -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter -import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event - -class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory { - - var identifier: AbsoluteTableIdentifier = _ - - override def init(identifier: AbsoluteTableIdentifier, - dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier - } - - override def fireEvent(event: Event): Unit = ??? - - override def clear(segmentId: String): Unit = {} - - override def clear(): Unit = {} - - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? - - override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ??? - - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = - DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath) - - override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) - - /** - * Get all distributable objects of a segmentid - * - * @return - */ - override def toDistributable(segmentId: String): util.List[DataMapDistributable] = { - ??? - } - -} - -class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { - def buildTestData(numRows: Int): DataFrame = { - import sqlContext.implicits._ - sqlContext.sparkContext.parallelize(1 to numRows, 1) - .map(x => ("a" + x, "b", x)) - .toDF("c1", "c2", "c3") - } - - def dropTable(): Unit = { - sql("DROP TABLE IF EXISTS carbon1") - sql("DROP TABLE IF EXISTS carbon2") - } - - override def beforeAll { - dropTable() - } - - test("test write datamap 2 pages") { - sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") - // register datamap writer - sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'") - val df = buildTestData(33000) - - // save dataframe to carbon file - df.write - .format("carbondata") - .option("tableName", "carbon1") - .option("tempCSV", "false") - .option("sort_columns","c1") - .mode(SaveMode.Overwrite) - .save() - - assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) - assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) - assert( - DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( - "blocklet start 0", - "add page data: blocklet 0, page 0", - "add page data: blocklet 0, page 1", - "blocklet end: 0" - )) - DataMapWriterSuite.callbackSeq = Seq() - } - - test("test write datamap 2 blocklet") { - sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") - sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'") - - CarbonProperties.getInstance() - .addProperty("carbon.blockletgroup.size.in.mb", "1") - CarbonProperties.getInstance() - .addProperty("carbon.number.of.cores.while.loading", - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL) - - val df = buildTestData(300000) - - // save dataframe to carbon file - df.write - .format("carbondata") - .option("tableName", "carbon2") - .option("tempCSV", "false") - .option("sort_columns","c1") - .option("SORT_SCOPE","GLOBAL_SORT") - .mode(SaveMode.Overwrite) - .save() - - assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) - assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) - // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than - // 64 MB - assert( - DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( - "blocklet start 0", - "add page data: blocklet 0, page 0", - "add page data: blocklet 0, page 1", - "add page data: blocklet 0, page 2", - "add page data: blocklet 0, page 3", - "add page data: blocklet 0, page 4", - "add page data: blocklet 0, page 5", - "add page data: blocklet 0, page 6", - "add page data: blocklet 0, page 7", - "add page data: blocklet 0, page 8", - "add page data: blocklet 0, page 9", - "blocklet end: 0" - )) - DataMapWriterSuite.callbackSeq = Seq() - } - - override def afterAll { - dropTable() - } -} - -object DataMapWriterSuite { - - var callbackSeq: Seq[String] = Seq[String]() - - def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, - dataWritePath: String) = - new AbstractDataMapWriter(identifier, segmentId, dataWritePath) { - - override def onPageAdded( - blockletId: Int, - pageId: Int, - pages: Array[ColumnPage]): Unit = { - assert(pages.length == 1) - assert(pages(0).getDataType == DataTypes.STRING) - val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) - assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) - callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" - } - - override def onBlockletEnd(blockletId: Int): Unit = { - callbackSeq :+= s"blocklet end: $blockletId" - } - - override def onBlockEnd(blockId: String): Unit = { - callbackSeq :+= s"block end $blockId" - } - - override def onBlockletStart(blockletId: Int): Unit = { - callbackSeq :+= s"blocklet start $blockletId" - } - - /** - * Start of new block notification. - * - * @param blockId file name of the carbondata file - */ - override def onBlockStart(blockId: String) = { - callbackSeq :+= s"block start $blockId" - } - - /** - * This is called during closing of writer.So after this call no more data will be sent to this - * class. - */ - override def finish() = { - - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala deleted file mode 100644 index 8031dc2..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.testsuite.datamap - -import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory} -import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} -import org.apache.carbondata.core.datastore.FileReader -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.compression.SnappyCompressor -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.FineGrainBlocklet -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf -import org.apache.carbondata.core.util.ByteUtil -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest - -class FGDataMapFactory extends AbstractFineGrainDataMapFactory { - var identifier: AbsoluteTableIdentifier = _ - var dataMapSchema: DataMapSchema = _ - - /** - * Initialization of Datamap factory with the identifier and datamap name - */ - override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier - this.dataMapSchema = dataMapSchema - } - - /** - * Return a new write for this datamap - */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) - } - - /** - * Get the datamap for segmentid - */ - override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map { f => - val dataMap: AbstractFineGrainDataMap = new FGDataMap() - dataMap.init(new DataMapModel(f.getCanonicalPath)) - dataMap - }.toList.asJava - } - - /** - * Get datamap for distributable object. - */ - override def getDataMaps( - distributable: DataMapDistributable): java.util.List[AbstractFineGrainDataMap]= { - val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] - val dataMap: AbstractFineGrainDataMap = new FGDataMap() - dataMap.init(new DataMapModel(mapDistributable.getFilePath)) - Seq(dataMap).asJava - } - - /** - * Get all distributable objects of a segmentid - * - * @return - */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map { f => - val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) - d - }.toList.asJava - } - - - /** - * - * @param event - */ - override def fireEvent(event: Event):Unit = { - ??? - } - - /** - * Clears datamap of the segment - */ - override def clear(segmentId: String): Unit = { - } - - /** - * Clear all datamaps from memory - */ - override def clear(): Unit = { - } - - /** - * Return metadata of this datamap - */ - override def getMeta: DataMapMeta = { - new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, - List(ExpressionType.EQUALS, ExpressionType.IN).asJava) - } -} - -class FGDataMap extends AbstractFineGrainDataMap { - - var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _ - var FileReader: FileReader = _ - var filePath: String = _ - val compressor = new SnappyCompressor - - /** - * It is called to load the data map to memory or to initialize it. - */ - override def init(dataMapModel: DataMapModel): Unit = { - this.filePath = dataMapModel.getFilePath - val size = FileFactory.getCarbonFile(filePath).getSize - FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) - val footerLen = FileReader.readInt(filePath, size - 4) - val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen) - val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) - val obj = new ObjectInputStream(in) - maxMin = obj.readObject() - .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]] - } - - /** - * Prune the datamap with filter expression. It returns the list of - * blocklets where these filters can exist. - * - * @param filterExp - * @return - */ - override def prune( - filterExp: FilterResolverIntf, - segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { - val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() - val expression = filterExp.getFilterExpression - getEqualToExpression(expression, buffer) - val value = buffer.map { f => - f.getChildren.get(1).evaluate(null).getString - } - val meta = findMeta(value(0).getBytes) - meta.map { f => - readAndFindData(f, value(0).getBytes()) - }.filter(_.isDefined).map(_.get).asJava - } - - private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), - value: Array[Byte]): Option[FineGrainBlocklet] = { - val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) - val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) - val obj = new ObjectInputStream(outputStream) - val blockletsData = obj.readObject() - .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]] - - import scala.collection.Searching._ - val searching = blockletsData - .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[ - (Array[Byte], Seq[Seq[Int]], Seq[Int])] { - override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]), - y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = { - ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1) - } - }) - if (searching.insertionPoint >= 0) { - val f = blockletsData(searching.insertionPoint) - val pages = f._3.zipWithIndex.map { p => - val pg = new FineGrainBlocklet.Page - pg.setPageId(p._1) - pg.setRowId(f._2(p._2).toArray) - pg - } - pages - Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) - } else { - None - } - - } - - private def findMeta(value: Array[Byte]) = { - val tuples = maxMin.filter { f => - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 && - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0 - } - tuples - } - - def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { - if (expression.isInstanceOf[EqualToExpression]) { - buffer += expression - } else { - if (expression.getChildren != null) { - expression.getChildren.asScala.map { f => - if (f.isInstanceOf[EqualToExpression]) { - buffer += f - } - getEqualToExpression(f, buffer) - } - } - } - } - - /** - * Clear complete index table and release memory. - */ - override def clear():Unit = { - ??? - } - - override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? -} - -class FGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema) - extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { - - var currentBlockId: String = null - val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() + - ".datamap" - val stream: DataOutputStream = FileFactory - .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) - val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() - val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]() - var position: Long = 0 - val compressor = new SnappyCompressor - - /** - * Start of new block notification. - * - * @param blockId file name of the carbondata file - */ - override def onBlockStart(blockId: String): Unit = { - currentBlockId = blockId - } - - /** - * End of block notification - */ - override def onBlockEnd(blockId: String): Unit = { - - } - - /** - * Start of new blocklet notification. - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletStart(blockletId: Int): Unit = { - - } - - /** - * End of blocklet notification - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletEnd(blockletId: Int): Unit = { - val sorted = blockletList - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) - var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null - var addedLast: Boolean = false - val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]() - // Merge all same column values to single row. - sorted.foreach { f => - if (oldValue != null) { - if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { - oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3) - addedLast = false - } else { - blockletListUpdated += oldValue - oldValue = (f._1, Seq(f._2), f._3) - addedLast = true - } - } else { - oldValue = (f._1, Seq(f._2), f._3) - addedLast = false - } - } - if (!addedLast && oldValue != null) { - blockletListUpdated += oldValue - } - - val out = new ByteOutputStream() - val outStream = new ObjectOutputStream(out) - outStream.writeObject(blockletListUpdated) - outStream.close() - val bytes = compressor.compressByte(out.getBytes) - stream.write(bytes) - maxMin += - ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last - ._1), position, bytes.length)) - position += bytes.length - blockletList.clear() - } - - /** - * Add the column pages row to the datamap, order of pages is same as `indexColumns` in - * DataMapMeta returned in DataMapFactory. - * - * Implementation should copy the content of `pages` as needed, because `pages` memory - * may be freed after this method returns, if using unsafe column page. - */ - override def onPageAdded(blockletId: Int, - pageId: Int, - pages: Array[ColumnPage]): Unit = { - val size = pages(0).getPageSize - val list = new ArrayBuffer[(Array[Byte], Int)]() - var i = 0 - while (i < size) { - val bytes = pages(0).getBytes(i) - val newBytes = new Array[Byte](bytes.length - 2) - System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) - list += ((newBytes, i)) - i = i + 1 - } - // Sort based on the column data in order to create index. - val sorted = list - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) - var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null - var addedLast: Boolean = false - // Merge all same column values to single row. - sorted.foreach { f => - if (oldValue != null) { - if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { - oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3) - addedLast = false - } else { - blockletList += oldValue - oldValue = (f._1, Seq(f._2), Seq(pageId)) - addedLast = true - } - } else { - oldValue = (f._1, Seq(f._2), Seq(pageId)) - addedLast = false - } - } - if (!addedLast && oldValue != null) { - blockletList += oldValue - } - } - - - /** - * This is called during closing of writer.So after this call no more data will be sent to this - * class. - */ - override def finish(): Unit = { - val out = new ByteOutputStream() - val outStream = new ObjectOutputStream(out) - outStream.writeObject(maxMin) - outStream.close() - val bytes = compressor.compressByte(out.getBytes) - stream.write(bytes) - stream.writeInt(bytes.length) - stream.close() - commitFile(fgwritepath) - } - - -} - -class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/compaction/fil2.csv" - - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 150000 - CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) - sql("DROP TABLE IF EXISTS normal_test") - sql( - """ - | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") - } - - test("test fg datamap") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") - // register datamap writer - sql( - s""" - | CREATE DATAMAP ggdatamap ON TABLE datamap_test - | USING '${classOf[FGDataMapFactory].getName}' - | DMPROPERTIES('indexcolumns'='name') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test where name='n502670'"), - sql("select * from normal_test where name='n502670'")) - } - - test("test fg datamap with 2 datamaps ") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") - // register datamap writer - sql( - s""" - | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test - | USING '${classOf[FGDataMapFactory].getName}' - | DMPROPERTIES('indexcolumns'='name') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test - | USING '${classOf[FGDataMapFactory].getName}' - | DMPROPERTIES('indexcolumns'='city') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), - sql("select * from normal_test where name='n502670' and city='c2670'")) - } - - override protected def afterAll(): Unit = { - CompactionSupportGlobalSortBigFileTest.deleteFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql("DROP TABLE IF EXISTS datamap_test") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala new file mode 100644 index 0000000..8ddad75 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainIndexDataMap, AbstractFineGrainIndexDataMapFactory} +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} +import org.apache.carbondata.core.datastore.FileReader +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.compression.SnappyCompressor +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.FineGrainBlocklet +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf +import org.apache.carbondata.core.util.ByteUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest + +class FGIndexDataMapFactory extends AbstractFineGrainIndexDataMapFactory { + var identifier: AbsoluteTableIdentifier = _ + var dataMapSchema: DataMapSchema = _ + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + this.dataMapSchema = dataMapSchema + } + + /** + * Return a new write for this datamap + */ + override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { + new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) + } + + /** + * Get the datamap for segmentid + */ + override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainIndexDataMap] = { + val file = FileFactory + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap() + dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap + }.toList.asJava + } + + /** + * Get datamap for distributable object. + */ + override def getDataMaps( + distributable: DataMapDistributable): java.util.List[AbstractFineGrainIndexDataMap]= { + val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] + val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap() + dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + Seq(dataMap).asJava + } + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { + val file = FileFactory + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) + d + }.toList.asJava + } + + + /** + * + * @param event + */ + override def fireEvent(event: Event):Unit = { + ??? + } + + /** + * Clears datamap of the segment + */ + override def clear(segmentId: String): Unit = { + } + + /** + * Clear all datamaps from memory + */ + override def clear(): Unit = { + } + + /** + * Return metadata of this datamap + */ + override def getMeta: DataMapMeta = { + new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, + List(ExpressionType.EQUALS, ExpressionType.IN).asJava) + } +} + +class FGIndexDataMap extends AbstractFineGrainIndexDataMap { + + var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _ + var FileReader: FileReader = _ + var filePath: String = _ + val compressor = new SnappyCompressor + + /** + * It is called to load the data map to memory or to initialize it. + */ + override def init(dataMapModel: DataMapModel): Unit = { + this.filePath = dataMapModel.getFilePath + val size = FileFactory.getCarbonFile(filePath).getSize + FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) + val footerLen = FileReader.readInt(filePath, size - 4) + val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen) + val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(in) + maxMin = obj.readObject() + .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]] + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + override def prune( + filterExp: FilterResolverIntf, + segmentProperties: SegmentProperties, + partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { + val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() + val expression = filterExp.getFilterExpression + getEqualToExpression(expression, buffer) + val value = buffer.map { f => + f.getChildren.get(1).evaluate(null).getString + } + val meta = findMeta(value(0).getBytes) + meta.map { f => + readAndFindData(f, value(0).getBytes()) + }.filter(_.isDefined).map(_.get).asJava + } + + private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), + value: Array[Byte]): Option[FineGrainBlocklet] = { + val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) + val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(outputStream) + val blockletsData = obj.readObject() + .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]] + + import scala.collection.Searching._ + val searching = blockletsData + .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[ + (Array[Byte], Seq[Seq[Int]], Seq[Int])] { + override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]), + y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = { + ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1) + } + }) + if (searching.insertionPoint >= 0) { + val f = blockletsData(searching.insertionPoint) + val pages = f._3.zipWithIndex.map { p => + val pg = new FineGrainBlocklet.Page + pg.setPageId(p._1) + pg.setRowId(f._2(p._2).toArray) + pg + } + pages + Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) + } else { + None + } + + } + + private def findMeta(value: Array[Byte]) = { + val tuples = maxMin.filter { f => + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0 + } + tuples + } + + def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { + if (expression.isInstanceOf[EqualToExpression]) { + buffer += expression + } else { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) + } + } + } + } + + /** + * Clear complete index table and release memory. + */ + override def clear():Unit = { + ??? + } + + override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? +} + +class FGDataMapWriter(identifier: AbsoluteTableIdentifier, + segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema) + extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { + + var currentBlockId: String = null + val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() + + ".datamap" + val stream: DataOutputStream = FileFactory + .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) + val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() + val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]() + var position: Long = 0 + val compressor = new SnappyCompressor + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String): Unit = { + currentBlockId = blockId + } + + /** + * End of block notification + */ + override def onBlockEnd(blockId: String): Unit = { + + } + + /** + * Start of new blocklet notification. + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletStart(blockletId: Int): Unit = { + + } + + /** + * End of blocklet notification + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletEnd(blockletId: Int): Unit = { + val sorted = blockletList + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) + var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null + var addedLast: Boolean = false + val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]() + // Merge all same column values to single row. + sorted.foreach { f => + if (oldValue != null) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { + oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3) + addedLast = false + } else { + blockletListUpdated += oldValue + oldValue = (f._1, Seq(f._2), f._3) + addedLast = true + } + } else { + oldValue = (f._1, Seq(f._2), f._3) + addedLast = false + } + } + if (!addedLast && oldValue != null) { + blockletListUpdated += oldValue + } + + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(blockletListUpdated) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + maxMin += + ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last + ._1), position, bytes.length)) + position += bytes.length + blockletList.clear() + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in IndexDataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + override def onPageAdded(blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + val size = pages(0).getPageSize + val list = new ArrayBuffer[(Array[Byte], Int)]() + var i = 0 + while (i < size) { + val bytes = pages(0).getBytes(i) + val newBytes = new Array[Byte](bytes.length - 2) + System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) + list += ((newBytes, i)) + i = i + 1 + } + // Sort based on the column data in order to create index. + val sorted = list + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) + var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null + var addedLast: Boolean = false + // Merge all same column values to single row. + sorted.foreach { f => + if (oldValue != null) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { + oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3) + addedLast = false + } else { + blockletList += oldValue + oldValue = (f._1, Seq(f._2), Seq(pageId)) + addedLast = true + } + } else { + oldValue = (f._1, Seq(f._2), Seq(pageId)) + addedLast = false + } + } + if (!addedLast && oldValue != null) { + blockletList += oldValue + } + } + + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish(): Unit = { + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(maxMin) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + stream.writeInt(bytes.length) + stream.close() + commitFile(fgwritepath) + } +} + +class FGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/compaction/fil2.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test fg datamap") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql( + s""" + | CREATE DATAMAP ggdatamap ON TABLE datamap_test + | USING '${classOf[FGIndexDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + test("test fg datamap with 2 datamaps ") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql( + s""" + | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test + | USING '${classOf[FGIndexDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='name') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test + | USING '${classOf[FGIndexDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='city') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), + sql("select * from normal_test where name='n502670' and city='c2670'")) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala new file mode 100644 index 0000000..5fd8ae9 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.Event + +class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory { + + var identifier: AbsoluteTableIdentifier = _ + + override def init(identifier: AbsoluteTableIdentifier, + dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + } + + override def fireEvent(event: Event): Unit = ??? + + override def clear(segmentId: String): Unit = {} + + override def clear(): Unit = {} + + override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ??? + + override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ??? + + override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = + IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath) + + override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String): util.List[DataMapDistributable] = { + ??? + } + +} + +class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll { + def buildTestData(numRows: Int): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numRows, 1) + .map(x => ("a" + x, "b", x)) + .toDF("c1", "c2", "c3") + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbon1") + sql("DROP TABLE IF EXISTS carbon2") + } + + override def beforeAll { + dropTable() + } + + test("test write datamap 2 pages") { + sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + // register datamap writer + sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2IndexDataMapFactory].getName}'") + val df = buildTestData(33000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon1") + .option("tempCSV", "false") + .option("sort_columns","c1") + .mode(SaveMode.Overwrite) + .save() + + assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end")) + assert( + IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "blocklet end: 0" + )) + IndexDataMapWriterSuite.callbackSeq = Seq() + } + + test("test write datamap 2 blocklet") { + sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2IndexDataMapFactory].getName}'") + + CarbonProperties.getInstance() + .addProperty("carbon.blockletgroup.size.in.mb", "1") + CarbonProperties.getInstance() + .addProperty("carbon.number.of.cores.while.loading", + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL) + + val df = buildTestData(300000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon2") + .option("tempCSV", "false") + .option("sort_columns","c1") + .option("SORT_SCOPE","GLOBAL_SORT") + .mode(SaveMode.Overwrite) + .save() + + assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end")) + // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than + // 64 MB + assert( + IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "add page data: blocklet 0, page 2", + "add page data: blocklet 0, page 3", + "add page data: blocklet 0, page 4", + "add page data: blocklet 0, page 5", + "add page data: blocklet 0, page 6", + "add page data: blocklet 0, page 7", + "add page data: blocklet 0, page 8", + "add page data: blocklet 0, page 9", + "blocklet end: 0" + )) + IndexDataMapWriterSuite.callbackSeq = Seq() + } + + override def afterAll { + dropTable() + } +} + +object IndexDataMapWriterSuite { + + var callbackSeq: Seq[String] = Seq[String]() + + def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, + dataWritePath: String) = + new AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + + override def onPageAdded( + blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + assert(pages.length == 1) + assert(pages(0).getDataType == DataTypes.STRING) + val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) + assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) + callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" + } + + override def onBlockletEnd(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet end: $blockletId" + } + + override def onBlockEnd(blockId: String): Unit = { + callbackSeq :+= s"block end $blockId" + } + + override def onBlockletStart(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet start $blockletId" + } + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String) = { + callbackSeq :+= s"block start $blockId" + } + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish() = { + + } + } +} \ No newline at end of file