Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r243901940
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
 ---
    @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging
     import org.apache.spark.sql.SparkSession
     
     import org.apache.carbondata.common.logging.LogServiceFactory
    -import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.datamap.{DataMapStoreManager, 
TableDataMap}
     import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable
     import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
     import org.apache.carbondata.events._
    +import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
    +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
     
     class MergeBloomIndexEventListener extends OperationEventListener with 
Logging {
       val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     
       override def onEvent(event: Event, operationContext: OperationContext): 
Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
         event match {
    +      case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
    +        // For loading process, segment can not be accessed at this time
    +        val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
    +        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val segmentId = loadModel.getSegmentId
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, 
Seq(segmentId))
    +
    +      case compactPreStatusUpdateEvent: 
AlterTableCompactionPreStatusUpdateEvent =>
    +        // For compact process, segment can not be accessed at this time
    +        val carbonTable = compactPreStatusUpdateEvent.carbonTable
    +        val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName
    +        val segmentId = 
CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, 
Seq(segmentId))
    +
           case datamapPostEvent: BuildDataMapPostExecutionEvent =>
    -        LOGGER.info("Load post status event-listener called for merge 
bloom index")
    +        // For rebuild datamap process, datamap is disabled when rebuilding
    +        if (!datamapPostEvent.isFromRebuild || null == 
datamapPostEvent.dmName) {
    +          // ignore datamapPostEvent from loading and compaction
    --- End diff --
    
    it's better to add a log here  (if (!datamapPostEvent.isFromRebuild || null 
== datamapPostEvent.dmName) )


---

Reply via email to