This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit be9580b5768389d6adf0392fc820fb7e7186bd4c Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Thu Sep 12 14:30:48 2019 +0530 [CARBONDATA-3526]Fix cache issue during update and query Problem: When multiple updates happen on table, cache is loaded during update operation, but since on second update the horizontal compaction happens inside the segment, already loaded into cache are invalid. So if we do clean files, physical deletion of horizontal compacted takes place, but still the cache contains old files. So when select query is fired, query fails with file not found exception. Solution: once after horizontal compaction is finished, new compacted files are generated, so the segments inside cache are now invalid, so clear the cache of invalid segment after horizontal compaction. During drop cache command, clear the cache of segmentMap also. This closes #3385 --- .../sql/execution/command/cache/CarbonDropCacheCommand.scala | 8 +++----- .../sql/execution/command/mutation/HorizontalCompaction.scala | 9 ++++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala index 1554f6a..7b8e10f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.MetadataCommand import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.CacheProvider -import org.apache.carbondata.core.datamap.DataMapUtil +import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{DropTableCacheEvent, OperationContext, OperationListenerBus} @@ -55,13 +55,11 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall carbonTable.getTableName)) { DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME) } else { - val allIndexFiles = CacheUtil.getAllIndexFiles(carbonTable)(sparkSession) // Extract dictionary keys for the table and create cache keys from those val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable) - // Remove elements from cache - val keysToRemove = allIndexFiles ++ dictKeys - cache.removeAll(keysToRemove.asJava) + cache.removeAll(dictKeys.asJava) + DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier) } } LOGGER.info("Drop cache request served for table " + carbonTable.getTableUniqueName) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index fb20e4f..62a3486 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager @@ -106,6 +106,13 @@ object HorizontalCompaction { segmentUpdateStatusManager, deleteTimeStamp, segLists) + + // If there are already index and data files are present for old update operation, then the + // cache will be loaded for those files during current update, but once after horizontal + // compaction is finished, new compacted files are generated, so the segments inside cache are + // now invalid, so clear the cache of invalid segment after horizontal compaction. + DataMapStoreManager.getInstance() + .clearInvalidSegments(carbonTable, segLists.asScala.map(_.getSegmentNo).asJava) } /**