This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 56879f747d150f4efae0f2998a38f4297706bc5e
Author: kunal642 <kunalkapoor...@gmail.com>
AuthorDate: Fri Jul 26 14:52:36 2019 +0530

    [CARBONDATA-3480] Fixed unnecessary refresh for table by removing modified 
mdt file
    
    This closes #3339
---
 .../carbondata/core/datamap/DataMapFilter.java     |  47 +++
 .../core/datamap/DataMapStoreManager.java          |  14 +-
 .../carbondata/core/metadata/CarbonMetadata.java   |   9 +
 .../core/metadata/schema/table/CarbonTable.java    |   4 +-
 .../core/metadata/schema/table/TableSchema.java    |   4 +
 .../statusmanager/SegmentUpdateStatusManager.java  |  26 --
 .../apache/carbondata/core/util/CarbonUtil.java    |   1 -
 .../core/metadata/CarbonMetadataTest.java          |   7 +-
 .../ThriftWrapperSchemaConverterImplTest.java      |   4 +-
 .../metadata/schema/table/CarbonTableTest.java     |   8 +-
 .../table/CarbonTableWithComplexTypesTest.java     |   6 +-
 .../dblocation/DBLocationCarbonTableTestCase.scala |  25 --
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  |   6 +-
 .../carbondata/indexserver/IndexServer.scala       |  10 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |  51 ++-
 .../command/datamap/CarbonDropDataMapCommand.scala |   1 -
 .../management/RefreshCarbonTableCommand.scala     |   2 -
 .../CarbonAlterTableDropPartitionCommand.scala     |  12 +-
 .../CarbonAlterTableSplitPartitionCommand.scala    |   3 -
 .../command/preaaggregate/PreAggregateUtil.scala   |  19 +-
 .../command/table/CarbonDropTableCommand.scala     |  13 +
 .../spark/sql/hive/CarbonFileMetastore.scala       | 425 +++++++++------------
 .../spark/sql/hive/CarbonHiveMetaStore.scala       |  10 +-
 .../apache/spark/sql/hive/CarbonMetaStore.scala    |  10 +-
 .../scala/org/apache/spark/util/CleanFiles.scala   |   3 -
 .../scala/org/apache/spark/util/Compaction.scala   |   2 -
 .../apache/spark/util/DeleteSegmentByDate.scala    |   2 -
 .../org/apache/spark/util/DeleteSegmentById.scala  |   2 -
 .../scala/org/apache/spark/util/TableLoader.scala  |   2 -
 .../apache/spark/sql/hive/CarbonSessionState.scala |  31 +-
 .../AlterTableColumnRenameTestCase.scala           |   4 +-
 31 files changed, 322 insertions(+), 441 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index c20d0d5..ac4886d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -18,10 +18,15 @@
 package org.apache.carbondata.core.datamap;
 
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
@@ -39,9 +44,51 @@ public class DataMapFilter implements Serializable {
   public DataMapFilter(CarbonTable table, Expression expression) {
     this.table = table;
     this.expression = expression;
+    if (expression != null) {
+      checkIfFilterColumnExistsInTable();
+    }
     resolve();
   }
 
+  private Set<String> extractColumnExpressions(Expression expression) {
+    Set<String> columnExpressionList = new HashSet<>();
+    for (Expression expressions: expression.getChildren()) {
+      if (expressions != null && expressions.getChildren() != null
+          && expressions.getChildren().size() > 0) {
+        columnExpressionList.addAll(extractColumnExpressions(expressions));
+      } else if (expressions instanceof ColumnExpression) {
+        columnExpressionList.add(((ColumnExpression) 
expressions).getColumnName());
+      }
+    }
+    return columnExpressionList;
+  }
+
+  private void checkIfFilterColumnExistsInTable() {
+    Set<String> columnExpressionList = extractColumnExpressions(expression);
+    for (String colExpression : columnExpressionList) {
+      if (colExpression.equalsIgnoreCase("positionid")) {
+        continue;
+      }
+      boolean exists = false;
+      for (CarbonMeasure carbonMeasure : table.getAllMeasures()) {
+        if (!carbonMeasure.isInvisible() && carbonMeasure.getColName()
+            .equalsIgnoreCase(colExpression)) {
+          exists = true;
+        }
+      }
+      for (CarbonDimension carbonDimension : table.getAllDimensions()) {
+        if (!carbonDimension.isInvisible() && carbonDimension.getColName()
+            .equalsIgnoreCase(colExpression)) {
+          exists = true;
+        }
+      }
+      if (!exists) {
+        throw new RuntimeException(
+            "Column " + colExpression + " not found in table " + 
table.getTableUniqueName());
+      }
+    }
+  }
+
   public DataMapFilter(FilterResolverIntf resolver) {
     this.resolver = resolver;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index a6a2031..ce0d6a6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -699,8 +699,13 @@ public final class DataMapStoreManager {
       SegmentUpdateDetails[] updateStatusDetails = 
statusManager.getUpdateStatusDetails();
       for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
         UpdateVO updateVO = 
statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
-        segmentRefreshTime.put(updateVO.getSegmentId(),
-            new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 
0));
+        SegmentRefreshInfo segmentRefreshInfo;
+        if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
+          segmentRefreshInfo = new 
SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0);
+        } else {
+          segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+        }
+        segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo);
       }
     }
 
@@ -708,8 +713,11 @@ public final class DataMapStoreManager {
       SegmentRefreshInfo segmentRefreshInfo =
           seg.getSegmentRefreshInfo(updateVo);
       String segmentId = seg.getSegmentNo();
+      if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null) {
+        return false;
+      }
       if (segmentRefreshTime.get(segmentId) == null
-          && segmentRefreshInfo.getSegmentUpdatedTimestamp() != null) {
+          && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
         segmentRefreshTime.put(segmentId, segmentRefreshInfo);
         return true;
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index e44092e..9c10a03 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.metadata;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -180,4 +181,12 @@ public final class CarbonMetadata {
     }
     return null;
   }
+
+  public List<CarbonTable> getAllTables() {
+    return new ArrayList<>(tableInfoMap.values());
+  }
+
+  public void clearAll() {
+    tableInfoMap.clear();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 1be1624..aa82b64 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -286,7 +287,8 @@ public class CarbonTable implements Serializable, Writable {
    * @return
    */
   public static String buildUniqueName(String databaseName, String tableName) {
-    return databaseName + CarbonCommonConstants.UNDERSCORE + tableName;
+    return (databaseName + CarbonCommonConstants.UNDERSCORE + 
tableName).toLowerCase(
+        Locale.getDefault());
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 4425697..61b2987 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -24,6 +24,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -142,6 +143,9 @@ public class TableSchema implements Serializable, Writable {
    * @param tableName the tableName to set
    */
   public void setTableName(String tableName) {
+    if (tableName != null) {
+      tableName = tableName.toLowerCase(Locale.getDefault());
+    }
     this.tableName = tableName;
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index eace1b7..f7083dc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -775,32 +775,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * Returns the invalid timestamp range of a segment.
-   * @return
-   */
-  public List<UpdateVO> getInvalidTimestampRange() {
-    List<UpdateVO> ranges = new ArrayList<UpdateVO>();
-    for (LoadMetadataDetails segment : segmentDetails) {
-      if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
-          || SegmentStatus.COMPACTED == segment.getSegmentStatus()
-          || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
-        UpdateVO range = new UpdateVO();
-        range.setSegmentId(segment.getLoadName());
-        range.setFactTimestamp(segment.getLoadStartTime());
-        if (!segment.getUpdateDeltaStartTimestamp().isEmpty() &&
-            !segment.getUpdateDeltaEndTimestamp().isEmpty()) {
-          range.setUpdateDeltaStartTimestamp(
-              
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
-          range.setLatestUpdateTimestamp(
-              
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
-        }
-        ranges.add(range);
-      }
-    }
-    return ranges;
-  }
-
-  /**
    *
    * @param block
    * @param needCompleteList
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a86690c..cb23e3e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2239,7 +2239,6 @@ public final class CarbonUtil {
     org.apache.carbondata.format.TableInfo tableInfo =
         new org.apache.carbondata.format.TableInfo(thriftFactTable,
             new ArrayList<org.apache.carbondata.format.TableSchema>());
-
     tableInfo.setDataMapSchemas(null);
     return tableInfo;
   }
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 0ec19f2..6d8dac6 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -47,6 +47,7 @@ public class CarbonMetadataTest {
 
   @BeforeClass public static void setUp() {
     carbonMetadata = CarbonMetadata.getInstance();
+    carbonMetadata.clearAll();
     carbonMetadata.loadTableMetadata(getTableInfo(10000));
     tableUniqueName = CarbonTable.buildUniqueName("carbonTestDatabase", 
"carbonTestTable");
   }
@@ -77,13 +78,13 @@ public class CarbonMetadataTest {
   @Test public void 
testGetCarbonTableReturingProperTableWithProperDimensionCount() {
     int expectedResult = 1;
     assertEquals(expectedResult,
-        
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbonTestTable"));
+        
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbontesttable"));
   }
 
   @Test public void 
testGetCarbonTableReturingProperTableWithProperMeasureCount() {
     int expectedResult = 1;
     assertEquals(expectedResult,
-        
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbonTestTable"));
+        
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbontesttable"));
   }
 
   @Test public void 
testGetCarbonTableReturingProperTableWithProperDatabaseName() {
@@ -92,7 +93,7 @@ public class CarbonMetadataTest {
   }
 
   @Test public void 
testGetCarbonTableReturingProperTableWithProperFactTableName() {
-    String expectedResult = "carbonTestTable";
+    String expectedResult = "carbontesttable";
     assertEquals(expectedResult, 
carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
   }
 
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 897c3cf..f03e193 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -1704,7 +1704,7 @@ public class ThriftWrapperSchemaConverterImplTest {
 
   @Test public void testFromExternalToWrapperTableSchema() {
     String tableId = "1";
-    String tableName = "tableName";
+    String tableName = "tablename";
     TableSchema actualResult =
         
thriftWrapperSchemaConverter.fromExternalToWrapperTableSchema(tabSchema, 
"tableName");
     assertEquals(tableId, actualResult.getTableId());
@@ -1729,7 +1729,7 @@ public class ThriftWrapperSchemaConverterImplTest {
     TableInfo actualResult = thriftWrapperSchemaConverter
         .fromExternalToWrapperTableInfo(externalTableInfo, "dbName", 
"tableName", "/path");
     assertEquals(time, actualResult.getLastUpdatedTime());
-    assertEquals("dbName_tableName", actualResult.getTableUniqueName());
+    assertEquals("dbname_tablename", actualResult.getTableUniqueName());
   }
 
 }
\ No newline at end of file
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index ec1303f..0f9e252 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -44,11 +44,11 @@ public class CarbonTableTest extends TestCase {
   }
 
   @Test public void testNumberOfDimensionReturnsProperCount() {
-    assertEquals(1, carbonTable.getNumberOfDimensions("carbonTestTable"));
+    assertEquals(1, carbonTable.getNumberOfDimensions("carbontesttable"));
   }
 
   @Test public void testNumberOfMeasureReturnsProperCount() {
-    assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
+    assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
   }
 
   @Test public void testGetDatabaseNameResturnsDatabaseName() {
@@ -56,7 +56,7 @@ public class CarbonTableTest extends TestCase {
   }
 
   @Test public void testFactTableNameReturnsProperFactTableName() {
-    assertEquals("carbonTestTable", carbonTable.getTableName());
+    assertEquals("carbontesttable", carbonTable.getTableName());
   }
 
   @Test public void testTableUniqueNameIsProper() {
@@ -65,7 +65,7 @@ public class CarbonTableTest extends TestCase {
 
   @Test public void testDimensionPresentInTableIsProper() {
     CarbonDimension dimension = new 
CarbonDimension(getColumnarDimensionColumn(), 0, -1, -1);
-    assertTrue(carbonTable.getDimensionByName("carbonTestTable", 
"IMEI").equals(dimension));
+    assertTrue(carbonTable.getDimensionByName("carbontesttable", 
"IMEI").equals(dimension));
   }
 
   static ColumnSchema getColumnarDimensionColumn() {
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index d3403d5..0d0d4df 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -43,11 +43,11 @@ public class CarbonTableWithComplexTypesTest extends 
TestCase {
   }
 
   @Test public void testNumberOfDimensionReturnsProperCount() {
-    assertEquals(2, carbonTable.getNumberOfDimensions("carbonTestTable"));
+    assertEquals(2, carbonTable.getNumberOfDimensions("carbontesttable"));
   }
 
   @Test public void testNumberOfMeasureReturnsProperCount() {
-    assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
+    assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
   }
 
   @Test public void testGetDatabaseNameResturnsDatabaseName() {
@@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase 
{
   }
 
   @Test public void testFactTableNameReturnsProperFactTableName() {
-    assertEquals("carbonTestTable", carbonTable.getTableName());
+    assertEquals("carbontesttable", carbonTable.getTableName());
   }
 
   @Test public void testTableUniqueNameIsProper() {
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
index 37ad08c..236dcfc 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
@@ -261,31 +261,6 @@ class DBLocationCarbonTableTestCase extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table carbontable")
   }
 
-  test("test mdt file path with configured paths") {
-    sql(s"create database carbon location '$dblocation'")
-    sql("use carbon")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, 
"/tmp/carbondata1/carbondata2/")
-    val (timestampFile, timestampFileType) = getMdtFileAndType()
-    FileFactory.deleteFile(timestampFile, timestampFileType)
-    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 
string) STORED BY 'org.apache.carbondata.format'""")
-    sql("drop table carbontable")
-    // perform file check
-    assert(FileFactory.isFileExist(timestampFile, true) ||
-           
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
-        CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
-    val (timestampFile2, timestampFileType2) = getMdtFileAndType()
-    FileFactory.deleteFile(timestampFile2, timestampFileType2)
-    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 
string) STORED BY 'org.apache.carbondata.format'""")
-    sql("drop table carbontable")
-    // perform file check
-    assert(FileFactory.isFileExist(timestampFile, true) ||
-           
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
-  }
-
   override def afterEach {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
diff --git 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index febba35..e3f1d3f 100644
--- 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -71,7 +71,7 @@ object CarbonSessionUtil {
             case _ =>
           }
           isRelationRefreshed =
-            
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+            CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
         case _ =>
       }
     }
@@ -79,12 +79,12 @@ object CarbonSessionUtil {
     rtnRelation match {
       case SubqueryAlias(_,
       MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, 
catalogTable)) =>
-        isRelationRefreshed = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+        isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
         if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
           catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
         }
       case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, 
catalogTable) =>
-        isRelationRefreshed = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+        isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
         if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
           catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
         }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 718bb74..fdaa3d1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -106,6 +106,11 @@ object IndexServer extends ServerInterface {
         sparkSession.sparkContext
           .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
       }
+      if (!request.getInvalidSegments.isEmpty) {
+        DistributedRDDUtils
+          .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
+            request.getInvalidSegments.asScala)
+      }
       val splits = new DistributedPruneRDD(sparkSession, request).collect()
       if (!request.isFallbackJob) {
         DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
@@ -113,11 +118,6 @@ object IndexServer extends ServerInterface {
       if (request.isJobToClearDataMaps) {
         
DistributedRDDUtils.invalidateTableMapping(request.getCarbonTable.getTableUniqueName)
       }
-      if (!request.getInvalidSegments.isEmpty) {
-        DistributedRDDUtils
-          .invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
-            request.getInvalidSegments.asScala)
-      }
       new ExtendedBlockletWrapperContainer(splits.map(_._2), 
request.isFallbackJob)
     }
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index e7a6d65..c13e7b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
@@ -215,38 +215,35 @@ object CarbonEnv {
       databaseNameOp: Option[String],
       tableName: String)
     (sparkSession: SparkSession): CarbonTable = {
-    refreshRelationFromCache(TableIdentifier(tableName, 
databaseNameOp))(sparkSession)
-    val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
     val catalog = getInstance(sparkSession).carbonMetaStore
-    // refresh cache
-    catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
databaseNameOp))
-
-    // try to get it from catch, otherwise lookup in catalog
-    catalog.getTableFromMetadataCache(databaseName, tableName)
-      .getOrElse(
-        catalog
-          .lookupRelation(databaseNameOp, tableName)(sparkSession)
-          .asInstanceOf[CarbonRelation]
-          .carbonTable)
+    // if relation is not refreshed of the table does not exist in cache then
+    if (isRefreshRequired(TableIdentifier(tableName, 
databaseNameOp))(sparkSession)) {
+      catalog
+        .lookupRelation(databaseNameOp, tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+        .carbonTable
+    } else {
+      
CarbonMetadata.getInstance().getCarbonTable(databaseNameOp.getOrElse(sparkSession
+        .catalog.currentDatabase), tableName)
+    }
   }
 
-  def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: 
SparkSession): Boolean = {
-    var isRefreshed = false
+  /**
+   *
+   * @return true is the relation was changes and was removed from cache. 
false is there is no
+   *         change in the relation.
+   */
+  def isRefreshRequired(identifier: TableIdentifier)(sparkSession: 
SparkSession): Boolean = {
     val carbonEnv = getInstance(sparkSession)
-    val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache(
-      
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
-      identifier.table)
-    if (carbonEnv.carbonMetaStore
-          .checkSchemasModifiedTimeAndReloadTable(identifier)  && 
table.isDefined) {
-      sparkSession.sessionState.catalog.refreshTable(identifier)
-      val tablePath = table.get.getTablePath
-      DataMapStoreManager.getInstance().
-        clearDataMaps(AbsoluteTableIdentifier.from(tablePath,
+    val databaseName = 
identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val table = CarbonMetadata.getInstance().getCarbonTable(databaseName, 
identifier.table)
+    if (table == null) {
+      true
+    } else {
+      
carbonEnv.carbonMetaStore.isSchemaRefreshed(AbsoluteTableIdentifier.from(table.getTablePath,
           
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
-          identifier.table, table.get.getTableInfo.getFactTable.getTableId))
-      isRefreshed = true
+          identifier.table, table.getTableInfo.getFactTable.getTableId), 
sparkSession)
     }
-    isRefreshed
   }
 
   /**
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index b4e60fb..1fa1337 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -67,7 +67,6 @@ case class CarbonDropDataMapCommand(
       val carbonEnv = CarbonEnv.getInstance(sparkSession)
       val catalog = carbonEnv.carbonMetaStore
       val tablePath = CarbonEnv.getTablePath(databaseNameOp, 
tableName)(sparkSession)
-      
catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
       if (mainTable == null) {
         mainTable = try {
           CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 6a9ac0a..cebf606 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -105,8 +105,6 @@ case class RefreshCarbonTableCommand(
         }
       }
     }
-    // update the schema modified time
-    metaStore.updateAndTouchSchemasUpdatedTime()
     Seq.empty
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 566e44e..507fe02 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -60,14 +60,8 @@ case class CarbonAlterTableDropPartitionCommand(
     setAuditTable(dbName, tableName)
     setAuditInfo(Map("partition" -> model.partitionId))
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-    val relation = carbonMetaStore.lookupRelation(Option(dbName), 
tableName)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-    val tablePath = relation.carbonTable.getTablePath
-    
carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
 Some(dbName)))
-    if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, 
tableName) == null) {
-      throwMetadataException(dbName, tableName, "table not found")
-    }
-    val carbonTable = relation.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), 
tableName)(sparkSession)
+    val tablePath = carbonTable.getTablePath
     val partitionInfo = carbonTable.getPartitionInfo(tableName)
     if (partitionInfo == null) {
       throwMetadataException(dbName, tableName, "table is not a partition 
table")
@@ -116,8 +110,6 @@ case class CarbonAlterTableDropPartitionCommand(
       thriftTable,
       null,
       carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
-    // update the schema modified time
-    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     // sparkSession.catalog.refreshTable(tableName)
     Seq.empty
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 72c3142..36ddce4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -69,7 +69,6 @@ case class CarbonAlterTableSplitPartitionCommand(
     if (relation == null) {
       throwMetadataException(dbName, tableName, "table not found")
     }
-    
carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName,
 Some(dbName)))
     if (null == (CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession))) {
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       throwMetadataException(dbName, tableName, "table not found")
@@ -107,8 +106,6 @@ case class CarbonAlterTableSplitPartitionCommand(
       thriftTable,
       null,
       carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
-    // update the schema modified time
-    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     Seq.empty
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index db52361..480d1f7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -471,7 +471,7 @@ object PreAggregateUtil {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetaStore
-      .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
+      .updateTableSchema(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         thriftTable,
         carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
@@ -539,23 +539,6 @@ object PreAggregateUtil {
     }
   }
 
-  def getChildCarbonTable(databaseName: String, tableName: String)
-    (sparkSession: SparkSession): Option[CarbonTable] = {
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-    val carbonTable = metaStore.getTableFromMetadataCache(databaseName, 
tableName)
-    if (carbonTable.isEmpty) {
-      try {
-        Some(metaStore.lookupRelation(Some(databaseName), 
tableName)(sparkSession)
-          .asInstanceOf[CarbonRelation].metaData.carbonTable)
-      } catch {
-        case _: Exception =>
-          None
-      }
-    } else {
-      carbonTable
-    }
-  }
-
   /**
    * Below method will be used to update logical plan
    * this is required for creating pre aggregate tables,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index ff0177b..6b80bbe 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
+import org.apache.spark.sql.hive.CarbonFileMetastore
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -183,6 +184,18 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, 
operationContext)
+      // Remove all invalid entries of carbonTable and corresponding updated 
timestamp
+      // values from the cache. This case is valid when there are 2 JDBCServer 
and one of them
+      // drops the table, the other server would not be able to clear its 
cache.
+      try {
+        CarbonEnv.getInstance(sparkSession).carbonMetaStore match {
+          case metastore: CarbonFileMetastore => 
metastore.removeStaleTimeStampEntries(sparkSession)
+          case _ =>
+        }
+      } catch {
+        case _: Exception =>
+          // Do nothing
+      }
     } catch {
       case ex: NoSuchTableException =>
         if (!ifExistsSet) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index ea3bba8..7ab2d47 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.net.URI
+import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => 
SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -36,7 +38,6 @@ import org.apache.spark.util.{CarbonReflectionUtils, 
SparkUtil}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -87,59 +88,95 @@ object MatchLogicalRelation {
   }
 }
 
-class CarbonFileMetastore extends CarbonMetaStore {
+private object CarbonFileMetastore {
+
+  final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
+
+  def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier,
+      localTimeStamp: Long): Boolean = synchronized {
+    val schemaFilePath = 
CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath)
+    val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath)
+    if (schemaCarbonFile.exists()) {
+      val oldTime = 
Option(CarbonFileMetastore.getTableModifiedTime(absoluteTableIdentifier
+        .getCarbonTableIdentifier
+        .getTableId))
+      val newTime = schemaCarbonFile.getLastModifiedTime
+      val isSchemaModified = oldTime match {
+        case Some(cacheTime) =>
+          cacheTime != newTime
+        case None => true
+      }
+      if (isSchemaModified) {
+        CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
+          .getCarbonTableIdentifier.getTableUniqueName)
+        
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+        true
+      } else {
+        localTimeStamp != newTime
+      }
+    } else {
+      false
+    }
+  }
 
-  @transient
-  val LOGGER = 
LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
+  def updateTableSchemaModifiedTime(tableUniqueId: String, timeStamp: Long): 
Unit = {
+    tableModifiedTimeStore.put(tableUniqueId, timeStamp)
+  }
 
-  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
-  tableModifiedTimeStore
-    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, 
System.currentTimeMillis())
+  def getTableModifiedTime(tableUniqueId: String): Long = {
+    tableModifiedTimeStore.get(tableUniqueId)
+  }
 
-  def nextQueryId: String = {
-    System.nanoTime() + ""
+  def removeStaleEntries(invalidTableUniqueIds: List[String]) {
+    for (invalidKey <- invalidTableUniqueIds) {
+      tableModifiedTimeStore.remove(invalidKey)
+    }
   }
+}
 
-  val metadata = MetaData(new ArrayBuffer[CarbonTable]())
+class CarbonFileMetastore extends CarbonMetaStore {
 
+  @transient private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
+
+  final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
 
   /**
-   * Create spark session from paramters.
-   *
-   * @param parameters
-   * @param absIdentifier
-   * @param sparkSession
+   * Create Carbon Relation by reading the schema file
    */
   override def createCarbonRelation(parameters: Map[String, String],
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
     val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
-    val tables = getTableFromMetadataCache(database, tableName)
+    val tables = Option(CarbonMetadata.getInstance.getCarbonTable(database, 
tableName))
     tables match {
       case Some(t) =>
-        CarbonRelation(database, tableName, 
CarbonSparkUtil.createSparkMeta(t), t)
-      case None =>
-        readCarbonSchema(absIdentifier,
-          !parameters.getOrElse("isTransactional", "true").toBoolean) match {
-          case Some(meta) =>
-            CarbonRelation(database, tableName,
-              CarbonSparkUtil.createSparkMeta(meta), meta)
-          case None =>
-            throw new NoSuchTableException(database, tableName)
+        if (isSchemaRefreshed(absIdentifier, sparkSession)) {
+          readCarbonSchema(absIdentifier, parameters)
+        } else {
+          CarbonRelation(database, tableName, 
CarbonSparkUtil.createSparkMeta(t), t)
         }
+      case None =>
+        readCarbonSchema(absIdentifier, parameters)
+    }
+  }
+
+  private def readCarbonSchema(absIdentifier: AbsoluteTableIdentifier,
+      parameters: Map[String, String]): CarbonRelation = {
+    readCarbonSchema(absIdentifier,
+      !parameters.getOrElse("isTransactional", "true").toBoolean) match {
+      case Some(meta) =>
+        CarbonRelation(absIdentifier.getDatabaseName, 
absIdentifier.getTableName,
+          CarbonSparkUtil.createSparkMeta(meta), meta)
+      case None =>
+        throw new NoSuchTableException(absIdentifier.getDatabaseName, 
absIdentifier.getTableName)
     }
   }
 
   /**
-   * This method will overwrite the existing schema and update it with the 
given details
-   *
-   * @param newTableIdentifier
-   * @param thriftTableInfo
-   * @param carbonStorePath
-   * @param sparkSession
+   * This method will overwrite the existing schema and update it with the 
given details.
    */
-  def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       carbonStorePath: String)(sparkSession: SparkSession): String = {
@@ -174,13 +211,17 @@ class CarbonFileMetastore extends CarbonMetaStore {
         catalogTable.provider match {
           case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
             || name.equalsIgnoreCase("carbondata")) => name
-          case _ => throw new NoSuchTableException(database, 
tableIdentifier.table)
+          case _ =>
+            CarbonMetadata.getInstance().removeTable(database, 
tableIdentifier.table)
+            throw new NoSuchTableException(database, tableIdentifier.table)
         }
         val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
            catalogTable.location.toString, database, tableIdentifier.table)
         CarbonEnv.getInstance(sparkSession).carbonMetaStore.
           createCarbonRelation(catalogTable.storage.properties, identifier, 
sparkSession)
-      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
+      case _ =>
+        CarbonMetadata.getInstance().removeTable(database, 
tableIdentifier.table)
+        throw new NoSuchTableException(database, tableIdentifier.table)
     }
 
     // fire post event after lookup relation
@@ -193,22 +234,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
     relation
   }
 
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  def getTableFromMetadataCache(database: String, tableName: String): 
Option[CarbonTable] = {
-    metadata.readLock.lock()
-    val ret = metadata.carbonTables
-      .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
-        table.getTableName.equalsIgnoreCase(tableName))
-    metadata.readLock.unlock()
-    ret
-  }
-
   def tableExists(
       table: String,
       databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean 
= {
@@ -226,71 +251,60 @@ class CarbonFileMetastore extends CarbonMetaStore {
     true
   }
 
-  def isTableInMetastore(identifier: AbsoluteTableIdentifier,
-      sparkSession: SparkSession): Boolean = {
-    sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
-      .exists(_.table.equalsIgnoreCase(identifier.getTableName))
-  }
-
-
   private def readCarbonSchema(identifier: AbsoluteTableIdentifier,
       inferSchema: Boolean): Option[CarbonTable] = {
-
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
     val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
     val tablePath = identifier.getTablePath
+    var schemaRefreshTime = System.currentTimeMillis()
     val wrapperTableInfo =
-    if (inferSchema) {
-      val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, 
tableName)
-      val tblInfoFromCache = if (carbonTbl != null) {
-        carbonTbl.getTableInfo
-      } else {
-        null
-      }
-
-      val thriftTableInfo : TableInfo = if (tblInfoFromCache != null) {
-        // In case the TableInfo is present in the Carbon Metadata Cache
-        // then get the tableinfo from the cache rather than infering from
-        // the CarbonData file.
-        schemaConverter
-          .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName)
-      } else {
-        schemaConverter
-          .fromWrapperToExternalTableInfo(SchemaReader
-                      .inferSchema(identifier, false),
-            dbName, tableName)
-      }
+      if (inferSchema) {
+        val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, 
tableName)
+        val tblInfoFromCache = if (carbonTbl != null) {
+          carbonTbl.getTableInfo
+        } else {
+          null
+        }
 
-      val wrapperTableInfo =
-        schemaConverter
-          .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, 
tablePath)
-      wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true")
-      wrapperTableInfo.setTransactionalTable(false)
-      Some(wrapperTableInfo)
-    } else {
-      val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-      val fileType = FileFactory.getFileType(tableMetadataFile)
-      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+        val thriftTableInfo: TableInfo = if (tblInfoFromCache != null) {
+          // In case the TableInfo is present in the Carbon Metadata Cache
+          // then get the tableinfo from the cache rather than infering from
+          // the CarbonData file.
+          schemaConverter
+            .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, 
tableName)
+        } else {
+          schemaConverter
+            .fromWrapperToExternalTableInfo(SchemaReader
+              .inferSchema(identifier, false),
+              dbName, tableName)
+        }
         val wrapperTableInfo =
-          schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, 
tableName, tablePath)
+          schemaConverter
+            .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, 
tableName, tablePath)
+        wrapperTableInfo.getFactTable.getTableProperties.put("_external", 
"true")
+        wrapperTableInfo.setTransactionalTable(false)
         Some(wrapperTableInfo)
       } else {
-        None
+        val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+        schemaRefreshTime = FileFactory
+          
.getCarbonFile(CarbonTablePath.getSchemaFilePath(tablePath)).getLastModifiedTime
+        val fileType = FileFactory.getFileType(tableMetadataFile)
+        if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+          val tableInfo: TableInfo = 
CarbonUtil.readSchemaFile(tableMetadataFile)
+          val wrapperTableInfo =
+            schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, 
tableName, tablePath)
+          Some(wrapperTableInfo)
+        } else {
+          None
+        }
       }
-    }
-
-
     wrapperTableInfo.map { tableInfo =>
+      updateSchemasUpdatedTime(tableInfo.getFactTable.getTableId, 
schemaRefreshTime)
       CarbonMetadata.getInstance().removeTable(tableUniqueName)
       CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-      val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-      metadata.writeLock.lock()
-      metadata.carbonTables += carbonTable
-      metadata.writeLock.unlock()
-      carbonTable
+      CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     }
   }
 
@@ -319,17 +333,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
       newTableIdentifier.getTableName,
       oldTableIdentifier.getTableId)
     val path = createSchemaThriftFile(newAbsoluteTableIdentifier, 
thriftTableInfo)
-    addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
-
+    addCarbonTableToCache(wrapperTableInfo, newAbsoluteTableIdentifier)
     path
   }
 
   /**
    * This method will is used to remove the evolution entry in case of failure.
-   *
-   * @param carbonTableIdentifier
-   * @param thriftTableInfo
-   * @param sparkSession
    */
   def revertTableSchemaInAlterFailure(carbonTableIdentifier: 
CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
@@ -343,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val evolutionEntries = 
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
     val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
-    addTableCache(wrapperTableInfo, absoluteTableIdentifier)
+    addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier)
     path
   }
 
@@ -360,7 +369,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val childSchemaList = wrapperTableInfo.getDataMapSchemaList
     childSchemaList.remove(childSchemaList.size() - 1)
     val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
-    addTableCache(wrapperTableInfo, absoluteTableIdentifier)
+    addCarbonTableToCache(wrapperTableInfo, absoluteTableIdentifier)
     path
   }
 
@@ -393,7 +402,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     
tableInfo.getFactTable.getSchemaEvolution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
     removeTableFromMetadata(tableInfo.getDatabaseName, 
tableInfo.getFactTable.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    addTableCache(tableInfo, absoluteTableIdentifier)
+    addCarbonTableToCache(tableInfo, absoluteTableIdentifier)
     CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
   }
 
@@ -417,61 +426,27 @@ class CarbonFileMetastore extends CarbonMetaStore {
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime())
+    val modifiedTime = System.currentTimeMillis()
+    FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(modifiedTime)
+    updateSchemasUpdatedTime(identifier.getCarbonTableIdentifier.getTableId, 
modifiedTime)
     identifier.getTablePath
   }
 
-  protected def addTableCache(
+  protected def addCarbonTableToCache(
       tableInfo: table.TableInfo,
-      absoluteTableIdentifier: AbsoluteTableIdentifier): 
ArrayBuffer[CarbonTable] = {
+      absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = {
     val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
-    CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, 
identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    metadata.writeLock.lock()
-    metadata.carbonTables +=
-      
CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
-    metadata.writeLock.unlock()
-    metadata.carbonTables
   }
 
   /**
-   * This method will remove the table meta from catalog metadata array
-   *
-   * @param dbName
-   * @param tableName
+   * This method will remove the table meta from CarbonMetadata cache.
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val carbonTableToBeRemoved: Option[CarbonTable] = 
getTableFromMetadataCache(dbName, tableName)
-    carbonTableToBeRemoved match {
-      case Some(carbonTable) =>
-        metadata.writeLock.lock()
-        metadata.carbonTables -= carbonTable
-        metadata.writeLock.unlock()
-      case None =>
-        if (LOGGER.isDebugEnabled) {
-          LOGGER.debug(s"No entry for table $tableName in database $dbName")
-        }
-    }
     CarbonMetadata.getInstance.removeTable(dbName, tableName)
   }
 
-  private def updateMetadataByWrapperTable(
-      wrapperTableInfo: 
org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
-
-    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-      wrapperTableInfo.getTableUniqueName)
-    for (i <- metadata.carbonTables.indices) {
-      metadata.writeLock.lock()
-      if (wrapperTableInfo.getTableUniqueName.equals(
-        metadata.carbonTables(i).getTableUniqueName)) {
-        metadata.carbonTables(i) = carbonTable
-      }
-      metadata.writeLock.unlock()
-    }
-  }
-
   def updateMetadataByThriftTable(schemaFilePath: String,
       tableInfo: TableInfo, dbName: String, tableName: String, tablePath: 
String): Unit = {
     
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
@@ -479,7 +454,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo =
       schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, 
tableName, tablePath)
-    updateMetadataByWrapperTable(wrapperTableInfo)
+    addCarbonTableToCache(wrapperTableInfo,
+      wrapperTableInfo.getOrCreateAbsoluteTableIdentifier())
   }
 
 
@@ -496,133 +472,65 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
 
-  def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier)
-    (sparkSession: SparkSession) {
+  def dropTable(absoluteTableIdentifier: 
AbsoluteTableIdentifier)(sparkSession: SparkSession) {
     val dbName = 
absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = 
absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-    val metadataFilePath = 
CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath)
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, 
tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
-    val fileType = FileFactory.getFileType(metadataFilePath)
-
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      // while drop we should refresh the schema modified time so that if any 
thing has changed
-      // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
-
-      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sparkSession)
-      updateSchemasUpdatedTime(touchSchemaFileSystemTime())
-      // discard cached table info in cachedDataSourceTables
-      val tableIdentifier = TableIdentifier(tableName, Option(dbName))
-      sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-      DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
-      
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
-      removeTableFromMetadata(dbName, tableName)
-    } else {
-      if (!isTransactionalCarbonTable(absoluteTableIdentifier)) {
-        removeTableFromMetadata(dbName, tableName)
-        CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sparkSession)
-        // discard cached table info in cachedDataSourceTables
-        val tableIdentifier = TableIdentifier(tableName, Option(dbName))
-        sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-        
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
-        
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
-        removeTableFromMetadata(dbName, tableName)
-      }
-    }
+    CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sparkSession)
+    // discard cached table info in cachedDataSourceTables
+    val tableIdentifier = TableIdentifier(tableName, Option(dbName))
+    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+    
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+    removeTableFromMetadata(dbName, tableName)
   }
 
 
   def isTransactionalCarbonTable(identifier: AbsoluteTableIdentifier): Boolean 
= {
-    val table = getTableFromMetadataCache(identifier.getDatabaseName, 
identifier.getTableName)
-    table.map(_.getTableInfo.isTransactionalTable).getOrElse(true)
-  }
-
-  private def getTimestampFileAndType() = {
-    var basePath = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
-        CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
-    basePath = CarbonUtil.checkAndAppendFileSystemURIScheme(basePath)
-    val timestampFile = basePath + "/" + 
CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-    val timestampFileType = FileFactory.getFileType(timestampFile)
-    if (!FileFactory.isFileExist(basePath, timestampFileType)) {
-      FileFactory
-        .createDirectoryAndSetPermission(basePath,
-          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    val table = Option(CarbonMetadata.getInstance()
+      .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName))
+    table match {
+      case Some(t) => t.isTransactionalTable
+      case None => true
     }
-    (timestampFile, timestampFileType)
   }
 
   /**
    * This method will put the updated timestamp of schema file in the table 
modified time store map
-   *
-   * @param timeStamp
    */
-  private def updateSchemasUpdatedTime(timeStamp: Long) {
-    tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, 
timeStamp)
-  }
-
-  def updateAndTouchSchemasUpdatedTime() {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime())
+  private def updateSchemasUpdatedTime(tableUniqueId: String, timeStamp: Long) 
{
+    tableModifiedTimeStore.put(tableUniqueId, timeStamp)
+    CarbonFileMetastore.updateTableSchemaModifiedTime(tableUniqueId, timeStamp)
   }
 
 
-  /**
-   * This method will check and create an empty schema timestamp file
-   *
-   * @return
-   */
-  private def touchSchemaFileSystemTime(): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType()
-    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.info(s"Creating timestamp file for $timestampFile")
-      FileFactory
-        .createNewFile(timestampFile,
-          timestampFileType,
-          true,
-          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
-    }
-    FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(System.currentTimeMillis())
-    // since there is no guarantee that exact same set modified time returns 
when called
-    // lastmodified time, so better get the time from file.
-    FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .getLastModifiedTime
-  }
-
-  def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: 
TableIdentifier): Boolean = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType()
-    var isRefreshed = false
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      val lastModifiedTime =
-        FileFactory.getCarbonFile(timestampFile, 
timestampFileType).getLastModifiedTime
-      if (!(lastModifiedTime ==
-            
tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
-        metadata.writeLock.lock()
-        metadata.carbonTables = metadata.carbonTables.filterNot(
-          table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) 
&&
-                   table.getDatabaseName
-                     .equalsIgnoreCase(tableIdentifier.database
-                       
.getOrElse(SparkSession.getActiveSession.get.sessionState.catalog
-                         .getCurrentDatabase)))
-        metadata.writeLock.unlock()
-        updateSchemasUpdatedTime(lastModifiedTime)
-        isRefreshed = true
+  override def isSchemaRefreshed(absoluteTableIdentifier: 
AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean = {
+    val localTimeStamp = 
Option(tableModifiedTimeStore.get(absoluteTableIdentifier
+      .getCarbonTableIdentifier
+      .getTableId))
+    if (localTimeStamp.isDefined) {
+      if (CarbonFileMetastore.checkIfRefreshIsNeeded(absoluteTableIdentifier, 
localTimeStamp.get)) {
+        sparkSession.sessionState
+          
.catalog.refreshTable(TableIdentifier(absoluteTableIdentifier.getTableName,
+            Some(absoluteTableIdentifier.getDatabaseName)))
+        true
+      } else {
+        false
       }
+    } else {
+      true
     }
-    isRefreshed
   }
 
   override def isReadFromHiveMetaStore: Boolean = false
 
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
-    metadata.readLock.lock
-    val ret = metadata.carbonTables.clone()
-    metadata.readLock.unlock
-    ret
+    CarbonMetadata.getInstance().getAllTables.asScala
   }
 
 
@@ -672,4 +580,21 @@ class CarbonFileMetastore extends CarbonMetaStore {
       case _ => throw new NoSuchTableException(tableIdentifier.database.get, 
tableIdentifier.table)
     }
   }
+
+  def removeStaleTimeStampEntries(sparkSession: SparkSession): Unit = {
+    val tablesList = sparkSession.sessionState.catalog.listDatabases().flatMap 
{
+      database =>
+        sparkSession.sessionState.catalog.listTables(database)
+          .map(table => s"${ database }_${ table.table 
}".toLowerCase(Locale.getDefault()))
+    }
+    val invalidTableIds = 
CarbonMetadata.getInstance().getAllTables.asScala.collect {
+      case carbonTable if !tablesList.contains(carbonTable.getTableUniqueName
+        .toLowerCase(Locale.getDefault())) =>
+        CarbonMetadata.getInstance()
+          
.removeTable(carbonTable.getTableUniqueName.toLowerCase(Locale.getDefault()))
+        carbonTable.getTableId
+    }
+    CarbonFileMetastore.removeStaleEntries(invalidTableIds.toList)
+  }
+
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index c8c7d31..a0a8c0f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -63,6 +63,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     carbonRelation
   }
 
+  override def isSchemaRefreshed(absoluteTableIdentifier: 
AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean = true
 
   override def isTablePathExists(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Boolean = {
@@ -78,7 +80,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
-    checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sparkSession)
     // discard cached table info in cachedDataSourceTables
     val tableIdentifier = TableIdentifier(tableName, Option(dbName))
@@ -88,11 +89,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     removeTableFromMetadata(dbName, tableName)
   }
 
-  override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: 
TableIdentifier): Boolean = {
-    // do nothing
-    false
-  }
-
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
     // Todo
     Seq()
@@ -140,7 +136,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    * @param carbonTablePath
    * @param sparkSession
    */
-  override def updateTableSchemaForDataMap(newTableIdentifier: 
CarbonTableIdentifier,
+  override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       carbonTablePath: String)(sparkSession: SparkSession): String = {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index f97a8ae..95efa6a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -82,7 +82,7 @@ trait CarbonMetaStore {
    * @param carbonStorePath
    * @param sparkSession
    */
-  def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       carbonStorePath: String)(sparkSession: SparkSession): String
@@ -135,9 +135,8 @@ trait CarbonMetaStore {
   def dropTable(tableIdentifier: AbsoluteTableIdentifier)
     (sparkSession: SparkSession)
 
-  def updateAndTouchSchemasUpdatedTime()
-
-  def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: 
TableIdentifier): Boolean
+  def isSchemaRefreshed(absoluteTableIdentifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean
 
   def isReadFromHiveMetaStore: Boolean
 
@@ -147,8 +146,6 @@ trait CarbonMetaStore {
       carbonTable: CarbonTable
   ): org.apache.carbondata.format.TableInfo
 
-  def getTableFromMetadataCache(database: String, tableName: String): 
Option[CarbonTable]
-
   /**
    * Method will be used to retrieve or create carbon data source relation
    *
@@ -173,6 +170,7 @@ trait CarbonMetaStore {
     val df: DataFrame = Dataset.ofRows(sparkSession, query)
     df.schema
   }
+
 }
 /**
  * Factory for Carbon metastore
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index cb3ae29..a9cb652 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -69,9 +69,6 @@ object CleanFiles {
       forceTableClean = args(2).toBoolean
     }
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: 
$dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetaStore.
-      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
-
     cleanFiles(spark, dbName, tableName, forceTableClean)
   }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 0a3a870..91203a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -58,8 +58,6 @@ object Compaction {
     val (dbName, tableName) = 
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: 
$dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetaStore.
-      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
     compaction(spark, dbName, tableName, compactionType)
   }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 90a37f6..6149421 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -45,8 +45,6 @@ object DeleteSegmentByDate {
     val (dbName, tableName) = 
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: 
$dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetaStore.
-      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 15bec02..023c7bf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -50,8 +50,6 @@ object DeleteSegmentById {
     val (dbName, tableName) = 
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: 
$dbName.$tableName")
-    CarbonEnv.getInstance(spark).carbonMetaStore.
-      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index efaa191..ef9a931 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -82,8 +82,6 @@ object TableLoader {
 
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: 
$dbName.$tableName")
 
-    CarbonEnv.getInstance(spark).carbonMetaStore.
-      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 26f778e..75e7d89 100644
--- 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -125,9 +125,9 @@ class CarbonHiveSessionCatalog(
     rtnRelation match {
       case SubqueryAlias(_,
       LogicalRelation(carbonDatasourceHadoopRelation: 
CarbonDatasourceHadoopRelation, _, _), _) =>
-        toRefreshRelation = refreshRelationFromCache(name, alias, 
carbonDatasourceHadoopRelation)
-      case LogicalRelation(carbonDatasourceHadoopRelation: 
CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = refreshRelationFromCache(name, alias, 
carbonDatasourceHadoopRelation)
+        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        toRefreshRelation = CarbonEnv.isRefreshRequired(name)(sparkSession)
       case _ =>
     }
 
@@ -138,31 +138,6 @@ class CarbonHiveSessionCatalog(
     }
   }
 
-  private def refreshRelationFromCache(identifier: TableIdentifier,
-      alias: Option[String],
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean 
= {
-    var isRefreshed = false
-    val storePath = CarbonProperties.getStorePath
-    carbonEnv.carbonMetaStore.
-      checkSchemasModifiedTimeAndReloadTable(identifier)
-
-    val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache(
-      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
-      carbonDatasourceHadoopRelation.carbonTable.getTableName)
-    if (table.isEmpty || (table.isDefined &&
-        table.get.getTableLastUpdatedTime !=
-          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) 
{
-      refreshTable(identifier)
-      DataMapStoreManager.getInstance().
-        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
-          identifier.database.getOrElse("default"),
-          identifier.table))
-      isRefreshed = true
-      logInfo(s"Schema changes have been detected for table: $identifier")
-    }
-    isRefreshed
-  }
-
   /**
    * returns hive client from session state
    *
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index dd1fa0f..d368a8e 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -133,10 +133,10 @@ class AlterTableColumnRenameTestCase extends 
Spark2QueryTest with BeforeAndAfter
     sql("alter table rename change empname name string")
     sql("update rename set (name) = ('joey') where workgroupcategory = 
'developer'").show()
     sql("insert into rename select 
20,'bill','PM','01-12-2015',3,'manager',14,'Learning',928479,'01-01-2016','30-11-2016',75,94,13547")
-    val df1 = sql("select * from rename where name = 'joey'")
+    val df1Count = sql("select * from rename where name = 'joey'").count
     sql("alter table rename change name empname string")
     val df2 = sql("select * from rename where empname = 'joey'")
-    assert(df1.count() == df2.count())
+    assert(df1Count == df2.count())
     sql("delete from rename where empname = 'joey'")
     val df3 = sql("select empname from rename")
     sql("alter table rename change empname newname string")

Reply via email to