Github user qiuchenjian commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3023#discussion_r243901940 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction --- End diff -- it's better to add a log here (if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) )
---