Github user qiuchenjian commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3023#discussion_r243902213 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) - } + .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) + .toList val segmentIds = datamapPostEvent.segmentIdList - if (bloomDatamaps.size > 0 && segmentIds.size > 0) { - // we extract bloom datamap name and index columns here - // because TableDataMap is not serializable - val bloomDMnames = ListBuffer.empty[String] - val bloomIndexColumns = ListBuffer.empty[Seq[String]] - bloomDatamaps.foreach( dm => { - bloomDMnames += dm.getDataMapSchema.getDataMapName - bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) - }) - new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, - segmentIds, bloomDMnames, bloomIndexColumns).collect() - } + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds) } } - private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { - DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName) + private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable, + bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = { + if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) { + // we extract bloom datamap name and index columns here + // because TableDataMap is not serializable + val bloomDMnames = ListBuffer.empty[String] + val bloomIndexColumns = ListBuffer.empty[Seq[String]] + bloomDatamaps.foreach(dm => { + bloomDMnames += dm.getDataMapSchema.getDataMapName + bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) + }) + LOGGER.info("Start to merge bloom index file") --- End diff -- it's better to add the key message on the three logs, such as tablename , segmentids,bloomDatamaps
---