Repository: carbondata Updated Branches: refs/heads/master 55f4bc6c8 -> b3f782062
[CARBONDATA-2623][DataMap] Add DataMap Pre and Pevent listener Added Pre and Post Execution Events for index datamap This closes #2389 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b3f78206 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b3f78206 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b3f78206 Branch: refs/heads/master Commit: b3f7820623d4bc9ab4408beb8ad708ba9b19b899 Parents: 55f4bc6 Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Wed Jun 20 19:52:51 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Thu Jun 21 17:37:48 2018 +0530 ---------------------------------------------------------------------- .../carbondata/events/DataMapEvents.scala | 68 ++++++++++++++++++++ .../org/apache/carbondata/events/Events.scala | 18 +++++- .../datamap/IndexDataMapRebuildRDD.scala | 11 +++- .../spark/rdd/CarbonTableCompactor.scala | 23 ++++++- .../datamap/CarbonCreateDataMapCommand.scala | 22 +++++++ .../datamap/CarbonDataMapRebuildCommand.scala | 12 ++++ .../datamap/CarbonDropDataMapCommand.scala | 11 ++++ .../management/CarbonLoadDataCommand.scala | 21 +++++- 8 files changed, 181 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala new file mode 100644 index 0000000..8fb374f --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.events + +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * For handling operation's after finish of index creation over table with index datamap + * example: bloom datamap, Lucene datamap + */ +case class CreateDataMapPostExecutionEvent(sparkSession: SparkSession, + storePath: String) extends Event with CreateDataMapEventsInfo + +/** + * For handling operation's before start of update index datmap status over table with index datamap + * example: bloom datamap, Lucene datamap + */ +case class UpdateDataMapPreExecutionEvent(sparkSession: SparkSession, + storePath: String) extends Event with CreateDataMapEventsInfo + +/** + * For handling operation's after finish of update index datmap status over table with index + * datamap + * example: bloom datamap, Lucene datamap + */ +case class UpdateDataMapPostExecutionEvent(sparkSession: SparkSession, + storePath: String) extends Event with CreateDataMapEventsInfo + +/** + * For handling operation's before start of index build over table with index datamap + * example: bloom datamap, Lucene datamap + */ +case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession, + identifier: AbsoluteTableIdentifier, dataMapNames: scala.collection.mutable.Seq[String]) + extends Event with BuildDataMapEventsInfo + +/** + * For handling operation's after finish of index build over table with index datamap + * example: bloom datamap, Lucene datamap + */ +case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession, + identifier: AbsoluteTableIdentifier) + extends Event with TableEventInfo + +/** + * For handling operation's before start of index creation over table with index datamap + * example: bloom datamap, Lucene datamap + */ +case class CreateDataMapPreExecutionEvent(sparkSession: SparkSession, + storePath: String) extends Event with CreateDataMapEventsInfo + http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala index da62e02..1830a35 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping} -import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -168,3 +167,20 @@ trait DeleteFromTableEventInfo { trait SessionEventInfo { val sparkSession: SparkSession } + +/** + * Event info for create datamap + */ +trait CreateDataMapEventsInfo { + val sparkSession: SparkSession + val storePath: String +} + +/** + * Event info for build datamap + */ +trait BuildDataMapEventsInfo { + val sparkSession: SparkSession + val identifier: AbsoluteTableIdentifier + val dataMapNames: scala.collection.mutable.Seq[String] +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index cde6201..d064306 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -22,6 +22,7 @@ import java.text.SimpleDateFormat import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -42,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.TaskMetricsMap import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport @@ -67,13 +69,20 @@ object IndexDataMapRebuildRDD { val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments() val validSegments = validAndInvalidSegments.getValidSegments val indexedCarbonColumns = carbonTable.getIndexedColumns(schema) - + val operationContext = new OperationContext() + val buildDataMapPreExecutionEvent = new BuildDataMapPreExecutionEvent(sparkSession, + tableIdentifier, + mutable.Seq[String](schema.getDataMapName)) + OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, operationContext) // loop all segments to rebuild DataMap validSegments.asScala.foreach { segment => // if lucene datamap folder is exists, not require to build lucene datamap again refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName, indexedCarbonColumns, segment.getSegmentNo); } + val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession, + tableIdentifier) + OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext) } private def refreshOneSegment( http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 7605b9d..fcc649e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -22,12 +22,13 @@ import java.util.List import java.util.concurrent.ExecutorService import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, TableStatusReadCommittedScope} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} @@ -156,7 +157,18 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, carbonMergerMapping, mergedLoadName) OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) - + // Add pre event listener for index datamap + val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) + val dataMapOperationContext = new OperationContext() + if (null != tableDataMaps) { + val dataMapNames: mutable.Buffer[String] = + tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName) + val dataMapPreExecutionEvent: BuildDataMapPreExecutionEvent = + new BuildDataMapPreExecutionEvent(sqlContext.sparkSession, + carbonTable.getAbsoluteTableIdentifier, dataMapNames) + OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent, + dataMapOperationContext) + } var execInstance = "1" // in case of non dynamic executor allocation, number of executors are fixed. if (sc.sparkContext.getConf.contains("spark.executor.instances")) { @@ -272,6 +284,13 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, mergedLoadName) OperationListenerBus.getInstance() .fireEvent(compactionLoadStatusPostEvent, operationContext) + if (null != tableDataMaps) { + val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent = + new BuildDataMapPostExecutionEvent(sqlContext.sparkSession, + carbonTable.getAbsoluteTableIdentifier) + OperationListenerBus.getInstance() + .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) + } val commitDone = operationContext.getProperty("commitComplete") val commitComplete = if (null != commitDone) { commitDone.toString.toBoolean http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 1ae872a..27e1720 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -31,7 +31,9 @@ import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider} +import org.apache.carbondata.events._ /** * Below command class will be used to create datamap on table @@ -108,8 +110,18 @@ case class CarbonCreateDataMapCommand( "column '%s' already has datamap created", column.getColName)) } } + val operationContext: OperationContext = new OperationContext() + val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation + val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent = + new CreateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent, + operationContext) dataMapProvider.initMeta(queryString.orNull) DataMapStatusManager.disableDataMap(dataMapName) + val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent = + new CreateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent, + operationContext) case _ => if (deferredRebuild) { throw new MalformedDataMapCommandException( @@ -128,7 +140,17 @@ case class CarbonCreateDataMapCommand( if (mainTable != null && !deferredRebuild) { dataMapProvider.rebuild() if (dataMapSchema.isIndexDataMap) { + val operationContext: OperationContext = new OperationContext() + val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation + val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent = + new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent, + operationContext) DataMapStatusManager.enableDataMap(dataMapName) + val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent = + new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent, + operationContext) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala index 6493c83..beadc7e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala @@ -23,7 +23,9 @@ import org.apache.spark.sql.execution.command.DataCommand import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD} +import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _} /** * Rebuild the datamaps through sync with main table data. After sync with parent table's it enables @@ -49,7 +51,17 @@ case class CarbonDataMapRebuildCommand( provider.rebuild() // After rebuild successfully enable the datamap. + val operationContext: OperationContext = new OperationContext() + val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation + val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent = + new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent, + operationContext) DataMapStatusManager.enableDataMap(dataMapName) + val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent = + new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent, + operationContext) Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index f1ed5d1..722119e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider} import org.apache.carbondata.events._ @@ -197,7 +198,17 @@ case class CarbonDropDataMapCommand( if (dataMapSchema != null) { dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) + val operationContext: OperationContext = new OperationContext() + val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation + val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent = + UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent, + operationContext) DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName) + val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent = + UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent, + operationContext) // if it is indexDataMap provider like lucene, then call cleanData, which will launch a job // to clear datamap from memory(clears from segmentMap and cache), This is called before // deleting the datamap schemas from _System folder http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 69db3ea..38bdbcf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -62,7 +62,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations @@ -233,6 +233,18 @@ case class CarbonLoadDataCommand( isOverwriteTable) operationContext.setProperty("isOverwrite", isOverwriteTable) OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) + // Add pre event listener for index datamap + val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table) + val dataMapOperationContext = new OperationContext() + if (null != tableDataMaps) { + val dataMapNames: mutable.Buffer[String] = + tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName) + val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent = + new BuildDataMapPreExecutionEvent(sparkSession, + table.getAbsoluteTableIdentifier, dataMapNames) + OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, + dataMapOperationContext) + } // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. @@ -300,6 +312,13 @@ case class CarbonLoadDataCommand( table.getCarbonTableIdentifier, carbonLoadModel) OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext) + if (null != tableDataMaps) { + val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent = + BuildDataMapPostExecutionEvent(sparkSession, table.getAbsoluteTableIdentifier) + OperationListenerBus.getInstance() + .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) + } + } catch { case CausedBy(ex: NoRetryException) => // update the load entry in table status file for changing the status to marked for delete