[CARBONDATA-2714] Support merge index files for the segment

Problem :
The first-time query of carbon becomes very slow. It is because of reading many 
small carbonindex files and cache to the driver at the first time.
Many carbonindex files are created in below case
Loading data in large cluster
For example, if the cluster size is 100 nodes then for each load 100 index 
files are created per segment. So after 100 loads, the number of carbonindex 
files becomes 10000. .
It will be slower to read all the files from the driver since a lot of namenode 
calls and IO operations.
Solution :
Merge the carbonindex files in two levels.so that we can reduce the IO calls to 
namenode and improves the read performance.
Merge within a segment.
Merge the carbonindex files to single file immediately after load completes 
within the segment. It would be named as a .carbonindexmerge file. It is 
actually not a true data merging but a simple file merge. So that the current 
structure of carbonindex files does not change. While reading we just read one 
file instead of many carbonindex files within the segment.

This closes #2482


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/73419071
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/73419071
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/73419071

Branch: refs/heads/carbonstore
Commit: 73419071a308085be73c4a98fda57be241299fba
Parents: 6c5abdd
Author: dhatchayani <dhatcha.offic...@gmail.com>
Authored: Tue Jul 10 20:00:41 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Mon Jul 16 17:50:15 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  13 +-
 .../indexstore/BlockletDataMapIndexStore.java   |  12 +-
 .../core/metadata/SegmentFileStore.java         |  63 +++-
 .../core/util/BlockletDataMapUtil.java          |   2 +-
 .../sdv/generated/MergeIndexTestCase.scala      |  24 +-
 .../CarbonIndexFileMergeTestCase.scala          | 353 +++++++++++++++----
 .../dataload/TestGlobalSortDataLoad.scala       |   4 +-
 .../StandardPartitionTableCleanTestCase.scala   |   2 +-
 .../StandardPartitionTableLoadingTestCase.scala |   5 +
 .../carbondata/events/AlterTableEvents.scala    |  11 +
 .../carbondata/spark/util/CommonUtil.scala      |  59 +++-
 .../apache/spark/rdd/CarbonMergeFilesRDD.scala  | 100 ++++++
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  15 +-
 .../sql/events/MergeIndexEventListener.scala    | 180 ++++++++++
 .../CarbonAlterTableCompactionCommand.scala     |  23 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   1 -
 .../partition/TestAlterPartitionTable.scala     |  24 +-
 .../TestStreamingTableWithRowParser.scala       |  50 ---
 .../CarbonGetTableDetailComandTestCase.scala    |   4 +-
 .../processing/merger/CompactionType.java       |   1 +
 20 files changed, 751 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 3e2843c..e7e074d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1394,9 +1394,6 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SQLASTBUILDER_CLASSNAME =
       "spark.carbon.sqlastbuilder.classname";
 
-  public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME =
-      "spark.carbon.common.listener.register.classname";
-
   @CarbonProperty
   public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
       "carbon.lease.recovery.retry.count";
@@ -1871,6 +1868,16 @@ public final class CarbonCommonConstants {
    */
   public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
 
+  /**
+   * It is internal configuration and used only for test purpose.
+   * It will merge the carbon index files with in the segment to single 
segment.
+   */
+  @CarbonProperty
+  public static final String CARBON_MERGE_INDEX_IN_SEGMENT =
+      "carbon.merge.index.in.segment";
+
+  public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index d84f977..3918f3e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -106,11 +106,13 @@ public class BlockletDataMapIndexStore
                 new 
TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
                     identifierWrapper.getCarbonTable()), indexFileStore, 
filesRead,
                 carbonDataFileBlockMetaInfoMapping);
-            BlockDataMap blockletDataMap =
-                loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, 
blockMetaInfoMap,
-                    identifierWrapper.getCarbonTable(),
-                    identifierWrapper.isAddTableBlockToUnsafe());
-            dataMaps.add(blockletDataMap);
+            if (!blockMetaInfoMap.isEmpty()) {
+              BlockDataMap blockletDataMap =
+                  loadAndGetDataMap(blockIndexUniqueIdentifier, 
indexFileStore, blockMetaInfoMap,
+                      identifierWrapper.getCarbonTable(),
+                      identifierWrapper.isAddTableBlockToUnsafe());
+              dataMaps.add(blockletDataMap);
+            }
           }
           blockletDataMapIndexWrapper = new 
BlockletDataMapIndexWrapper(dataMaps);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index ce79e65..3d08a2d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -375,9 +375,6 @@ public class SegmentFileStore {
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
         for (CarbonFile file : listFiles) {
           if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-            List<String> indexFiles =
-                new 
SegmentIndexFileStore().getIndexFilesFromMergeFile(file.getAbsolutePath());
-            folderDetails.getFiles().addAll(indexFiles);
             folderDetails.setMergeFileName(file.getName());
           } else {
             folderDetails.getFiles().add(file.getName());
@@ -470,10 +467,12 @@ public class SegmentFileStore {
    * @param ignoreStatus
    * @throws IOException
    */
-  private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) 
throws IOException {
+  private List<String> readIndexFiles(SegmentStatus status, boolean 
ignoreStatus)
+      throws IOException {
     if (indexFilesMap != null) {
-      return;
+      return new ArrayList<>();
     }
+    List<String> indexOrMergeFiles = new ArrayList<>();
     SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
     indexFilesMap = new HashMap<>();
     indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, 
ignoreStatus);
@@ -487,7 +486,22 @@ public class SegmentFileStore {
         blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
       }
       indexFilesMap.put(entry.getKey(), blocks);
+      boolean added = false;
+      for (Map.Entry<String, List<String>> mergeFile : indexFileStore
+          .getCarbonMergeFileToIndexFilesMap().entrySet()) {
+        if (mergeFile.getValue().contains(entry.getKey()
+            
.substring(entry.getKey().lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1,
+                entry.getKey().length()))) {
+          indexOrMergeFiles.add(mergeFile.getKey());
+          added = true;
+          break;
+        }
+      }
+      if (!added) {
+        indexOrMergeFiles.add(entry.getKey());
+      }
     }
+    return indexOrMergeFiles;
   }
 
   /**
@@ -693,11 +707,13 @@ public class SegmentFileStore {
         // take the list of files from this segment.
         SegmentFileStore fileStore =
             new SegmentFileStore(table.getTablePath(), 
segment.getSegmentFile());
-        fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+        List<String> indexOrMergeFiles =
+            fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
         if (forceDelete) {
           deletePhysicalPartition(
               partitionSpecs,
               fileStore.getIndexFilesMap(),
+              indexOrMergeFiles,
               table.getTablePath());
         }
         for (Map.Entry<String, List<String>> entry : 
fileStore.indexFilesMap.entrySet()) {
@@ -707,11 +723,25 @@ public class SegmentFileStore {
               .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 
1,
                   indexFile.length() - 
CarbonTablePath.INDEX_FILE_EXT.length()));
           if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || 
forceDelete) {
-            toBeDeletedIndexFiles.add(indexFile);
             // Add the corresponding carbondata files to the delete list.
             toBeDeletedDataFiles.addAll(entry.getValue());
           }
         }
+        for (String indexFile : indexOrMergeFiles) {
+          Long fileTimestamp = 0L;
+          if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+            fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+                .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) 
+ 1,
+                    indexFile.length() - 
CarbonTablePath.INDEX_FILE_EXT.length()));
+          } else if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
{
+            fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+                
.substring(indexFile.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1,
+                    indexFile.length() - 
CarbonTablePath.MERGE_INDEX_FILE_EXT.length()));
+          }
+          if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || 
forceDelete) {
+            toBeDeletedIndexFiles.add(indexFile);
+          }
+        }
         if (toBeDeletedIndexFiles.size() > 0) {
           for (String dataFile : toBeDeletedIndexFiles) {
             FileFactory.deleteFile(dataFile, 
FileFactory.getFileType(dataFile));
@@ -734,7 +764,7 @@ public class SegmentFileStore {
   public static void deleteSegment(String tablePath, String segmentFile,
       List<PartitionSpec> partitionSpecs) throws IOException {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
-    fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
       FileFactory.deleteFile(entry.getKey(), 
FileFactory.getFileType(entry.getKey()));
@@ -742,7 +772,7 @@ public class SegmentFileStore {
         FileFactory.deleteFile(file, FileFactory.getFileType(file));
       }
     }
-    deletePhysicalPartition(partitionSpecs, indexFilesMap, tablePath);
+    deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, 
tablePath);
     String segmentFilePath =
         CarbonTablePath.getSegmentFilesLocation(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR
             + segmentFile;
@@ -757,7 +787,20 @@ public class SegmentFileStore {
    * If partition specs are null, then directly delete parent directory in 
locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
-      Map<String, List<String>> locationMap, String tablePath) throws 
IOException {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath)
+      throws IOException {
+    for (String indexOrMergFile : indexOrMergeFiles) {
+      if (null != partitionSpecs) {
+        Path location = new Path(indexOrMergFile);
+        boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
+        if (!exists) {
+          
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+        }
+      } else {
+        Path location = new Path(indexOrMergFile);
+        
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+      }
+    }
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       if (partitionSpecs != null) {
         Path location = new Path(entry.getKey());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 8fa48a8..58df0f7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -112,7 +112,7 @@ public class BlockletDataMapUtil {
         CarbonTable.updateTableByTableInfo(carbonTable, 
carbonTable.getTableInfo());
       }
       String blockPath = 
footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (null == blockMetaInfoMap.get(blockPath)) {
+      if (null == blockMetaInfoMap.get(blockPath) && 
FileFactory.isFileExist(blockPath)) {
         blockMetaInfoMap.put(blockPath, 
createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 215ad0d..5ab88bc 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -23,11 +23,13 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.common.util._
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -39,30 +41,37 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
 
 
   override protected def afterAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
   }
 
   test("Verify correctness of index merge sdv") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei 
string,deviceInformationId int,MAC string,deviceColor string,device_backColor 
string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit 
string,CPIClocked string,series string,productionDate timestamp,bomCode 
string,internalModels string, deliveryTime string, channelsId string, 
channelsName string , deliveryAreaId string, deliveryCountry string, 
deliveryProvince string, deliveryCity string,deliveryDistrict string, 
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, 
ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity 
string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, 
Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion 
string, Active_BacVerNumber string, Active_BacFlashVer string, 
Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY 
int, Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber double) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES 
('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE 
carbon_automation_nonmerge 
OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 
sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 
1)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS carbon_automation_merge")
     sql(s"""create table carbon_automation_merge (imei 
string,deviceInformationId int,MAC string,deviceColor string,device_backColor 
string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit 
string,CPIClocked string,series string,productionDate timestamp,bomCode 
string,internalModels string, deliveryTime string, channelsId string, 
channelsName string , deliveryAreaId string, deliveryCountry string, 
deliveryProvince string, deliveryCity string,deliveryDistrict string, 
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, 
ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity 
string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, 
Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion 
string, Active_BacVerNumber string, Active_BacFlashVer string, 
Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADP
 artitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber double) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES 
('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE 
carbon_automation_merge 
OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
 
-    val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", 
table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
       sql("""Select count(*) from carbon_automation_merge"""))
   }
 
   test("Verify command of index merge  sdv") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei 
string,deviceInformationId int,MAC string,deviceColor string,device_backColor 
string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit 
string,CPIClocked string,series string,productionDate timestamp,bomCode 
string,internalModels string, deliveryTime string, channelsId string, 
channelsName string , deliveryAreaId string, deliveryCountry string, 
deliveryProvince string, deliveryCity string,deliveryDistrict string, 
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, 
ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity 
string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, 
Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion 
string, Active_BacVerNumber string, Active_BacFlashVer string, 
Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY 
int, Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber double) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES 
('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -72,15 +81,16 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     val rows = sql("""Select count(*) from 
carbon_automation_nonmerge""").collect()
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 
1)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") >= 
1)
-    val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", 
table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", 
table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
+    sql("alter table carbon_automation_nonmerge compact 'SEGMENT_INDEX'")
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 
0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 
0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), 
rows)
   }
 
   test("Verify index index merge with compaction  sdv") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei 
string,deviceInformationId int,MAC string,deviceColor string,device_backColor 
string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit 
string,CPIClocked string,series string,productionDate timestamp,bomCode 
string,internalModels string, deliveryTime string, channelsId string, 
channelsName string , deliveryAreaId string, deliveryCountry string, 
deliveryProvince string, deliveryCity string,deliveryDistrict string, 
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, 
ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity 
string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, 
Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion 
string, Active_BacVerNumber string, Active_BacFlashVer string, 
Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY 
int, Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber double) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES 
('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -93,9 +103,9 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 
1)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") >= 
1)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "2") >= 
1)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
-    val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    new 
CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", 
table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") 
== 0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1", 
true) >= 1)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), 
rows)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 5871262..8ee2275 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,22 +17,19 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
-import org.junit.Assert
-
-import scala.collection.JavaConverters._
-
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
 class CarbonIndexFileMergeTestCase
   extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -47,9 +44,14 @@ class CarbonIndexFileMergeTestCase
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
   }
 
   test("Verify correctness of index merge") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -60,6 +62,8 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS indexmerge")
     sql(
       """
@@ -69,15 +73,14 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"indexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
   }
 
   test("Verify command of index merge") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -92,17 +95,17 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   test("Verify command of index merge without enabling property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -117,17 +120,16 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
-  test("Verify index index merge with compaction") {
+  test("Verify index merge with compaction") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
"2,2")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -139,21 +141,23 @@ class CarbonIndexFileMergeTestCase
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
-        s"'GLOBAL_SORT_PARTITIONS'='100')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
-  test("Verify index index merge for compacted segments") {
+  test("Verify index merge for compacted segments") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
"2,3")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -165,41 +169,71 @@ class CarbonIndexFileMergeTestCase
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
-        s"'GLOBAL_SORT_PARTITIONS'='100')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
-        s"'GLOBAL_SORT_PARTITIONS'='100')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
+    sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
   test("Query should not fail after iud operation on a table having merge 
indexes") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("drop table if exists mitable")
     sql("create table mitable(id int, issue date) stored by 'carbondata'")
     sql("insert into table mitable select '1','2000-02-01'")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"mitable")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
     sql("update mitable set(id)=(2) where issue = '2000-02-01'").show()
     sql("clean files for table mitable")
     sql("select * from mitable").show()
+}
+
+  test("Verify index merge for compacted segments MINOR") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
"2,3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   // CARBONDATA-2704, test the index file size after merge
   test("Verify the size of the index file after merge") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
 "false")
     sql("DROP TABLE IF EXISTS fileSize")
     sql(
       """
@@ -215,43 +249,215 @@ class CarbonIndexFileMergeTestCase
     Assert
       .assertEquals(getIndexOrMergeIndexFileSize(table, "0", 
CarbonTablePath.INDEX_FILE_EXT),
         segment0.head.getIndexSize.toLong)
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, 
String.valueOf(System.currentTimeMillis()))
+    sql("Alter table fileSize compact 'segment_index'")
     loadMetadataDetails = SegmentStatusManager
       
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath))
     segment0 = loadMetadataDetails.filter(x=> 
x.getLoadName.equalsIgnoreCase("0"))
     Assert
       .assertEquals(getIndexOrMergeIndexFileSize(table, "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT),
         segment0.head.getIndexSize.toLong)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
 "true")
     sql("DROP TABLE IF EXISTS fileSize")
   }
 
-  private def getIndexFileCount(tableName: String, segmentNo: String): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentNo)
-    if (FileFactory.isFileExist(segmentDir)) {
-      val indexFiles = new 
SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
-      indexFiles.asScala.map { f =>
-        if (f._2 == null) {
-          1
-        } else {
-          0
-        }
-      }.sum
-    } else {
-      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
-      if (segment != null) {
-        val store = new SegmentFileStore(carbonTable.getTablePath, 
segment.getSegmentFileName)
-        store.getSegmentFile.getLocationMap.values().asScala.map { f =>
-          if (f.getMergeFileName == null) {
-            f.getFiles.size()
-          } else {
-            0
-          }
-        }.sum
-      } else {
-        0
+  test("Verify index merge for compacted segments MINOR - level 2") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
"2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+  }
+
+  test("Verify index merge for compacted segments Auto Compaction") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
"2,3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, 
"true")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')"
+    )
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), 
Seq(Row(3000000)))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, 
"false")
+  }
+
+  test("Verify index merge for compacted segments Auto Compaction - level 2") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
"2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, 
"true")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')"
+    )
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), 
Seq(Row(3000000)))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, 
"false")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+
+  test("Verify index merge for partition table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("DROP TABLE IF EXISTS partitionTable")
+    sql(
+      """
+        | CREATE TABLE partitionTable(id INT, name STRING, city STRING)
+        | PARTITIONED BY(age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE partitionTable 
OPTIONS('header'='false')")
+    assert(getIndexFileCount("default_partitionTable", "0") == 0)
+    sql("DROP TABLE IF EXISTS partitionTable")
+  }
+
+  test("Verify index merge for pre-aggregate table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("DROP TABLE IF EXISTS preAggTable")
+    sql(
+      """
+        | CREATE TABLE preAggTable(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE preAggTable 
OPTIONS('header'='false')")
+    assert(getIndexFileCount("default_preAggTable", "0") == 0)
+    sql("create datamap preAggSum on table preAggTable using 'preaggregate' as 
" +
+        "select city,sum(age) as sum from preAggTable group by city")
+    assert(getIndexFileCount("default_preAggTable_preAggSum", "0") == 0)
+    sql("DROP TABLE IF EXISTS partitionTable")
+  }
+
+  test("Verify index merge for streaming table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("DROP TABLE IF EXISTS streamingTable")
+    sql(
+      """
+        | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age 
INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable 
OPTIONS('header'='false')")
+    assert(getIndexFileCount("default_streamingTable", "0") >= 1)
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table streamingTable compact 'segment_index'")
+    }.getMessage
+    assert(exceptionMessage.contains("Unsupported alter operation on carbon 
table: Merge index is not supported on streaming table"))
+    sql("DROP TABLE IF EXISTS streamingTable")
+  }
+
+  private def getIndexFileCount(tableName: String, segment: String): Int = {
+    val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
+    val path = CarbonTablePath
+      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new 
CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath
+          .INDEX_FILE_EXT)
       }
+    })
+    if (carbonFiles != null) {
+      carbonFiles.length
+    } else {
+      0
     }
   }
 
@@ -271,5 +477,4 @@ class CarbonIndexFileMergeTestCase
     })
     size
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index c40526d..0c42264 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -277,13 +277,13 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"carbon_globalsort")
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
"0")
     if (FileFactory.isFileExist(segmentDir)) {
-      assertResult(Math.max(7, defaultParallelism) + 1)(new 
File(segmentDir).listFiles().length)
+      assertResult(Math.max(4, defaultParallelism) + 1)(new 
File(segmentDir).listFiles().length)
     } else {
       val segment = Segment.getSegment("0", carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, 
segment.getSegmentFileName)
       store.readIndexFiles()
       val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
-      assertResult(Math.max(7, defaultParallelism) + 1)(size + 
store.getIndexFilesMap.size())
+      assertResult(Math.max(4, defaultParallelism) + 1)(size + 
store.getIndexFilesMap.size())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index cfc6983..319953c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -60,7 +60,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest 
with BeforeAndAfterA
     val details = 
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
     val segLoad = details.find(_.getLoadName.equals(segmentId)).get
     val seg = new SegmentFileStore(carbonTable.getTablePath, 
segLoad.getSegmentFile)
-    assert(seg.getIndexFiles.size == indexes)
+    assert(seg.getIndexOrMergeFiles.size == indexes)
   }
 
   test("clean up partition table for int partition column") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 1db1f4a..b61583e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -337,6 +337,8 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
   }
 
   test("merge carbon index disable data loading for partition table for three 
partition column") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
     sql(
       """
         | CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp,
@@ -354,6 +356,9 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     store.readIndexFiles()
     store.getIndexFiles
     assert(store.getIndexFiles.size() == 10)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
   }
 
   test("load static partition table for one static partition column with load 
syntax issue") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 37abd73..1810df6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -204,6 +204,17 @@ case class AlterTableCompactionAbortEvent(sparkSession: 
SparkSession,
     alterTableModel: AlterTableModel) extends Event with 
AlterTableCompactionEventInfo
 
 /**
+ * Compaction Event for handling merge index in alter DDL
+ *
+ * @param sparkSession
+ * @param carbonTable
+ * @param alterTableModel
+ */
+case class AlterTableMergeIndexEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    alterTableModel: AlterTableModel) extends Event with 
AlterTableCompactionEventInfo
+
+/**
  * pre event for standard hive partition
  * @param sparkSession
  * @param carbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 39530f4..1cd4d77 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -29,6 +29,7 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.rdd.CarbonMergeFilesRDD
 import org.apache.spark.sql.{Row, RowFactory}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
@@ -53,10 +54,8 @@ import org.apache.carbondata.core.util.{ByteUtil, 
CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 
 
 object CommonUtil {
@@ -827,4 +826,60 @@ object CommonUtil {
     }
   }
 
+  /**
+   * Merge the carbonindex files with in the segment to carbonindexmerge file 
inside same segment
+   *
+   * @param sparkContext
+   * @param segmentIds
+   * @param tablePath
+   * @param carbonTable
+   * @param mergeIndexProperty
+   * @param readFileFooterFromCarbonDataFile flag to read file footer 
information from carbondata
+   *                                         file. This will used in case of 
upgrade from version
+   *                                         which do not store the blocklet 
info to current
+   *                                         version
+   */
+  def mergeIndexFiles(sparkContext: SparkContext,
+    segmentIds: Seq[String],
+    segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+    tablePath: String,
+    carbonTable: CarbonTable,
+    mergeIndexProperty: Boolean,
+    readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
+    if (mergeIndexProperty) {
+      new CarbonMergeFilesRDD(
+        sparkContext,
+        carbonTable,
+        segmentIds,
+        segmentFileNameToSegmentIdMap,
+        carbonTable.isHivePartitionTable,
+        readFileFooterFromCarbonDataFile).collect()
+    } else {
+      try {
+        if (CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+          new CarbonMergeFilesRDD(
+            sparkContext,
+            carbonTable,
+            segmentIds,
+            segmentFileNameToSegmentIdMap,
+            carbonTable.isHivePartitionTable,
+            readFileFooterFromCarbonDataFile).collect()
+        }
+      } catch {
+        case _: Exception =>
+          if 
(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
+            new CarbonMergeFilesRDD(
+              sparkContext,
+              carbonTable,
+              segmentIds,
+              segmentFileNameToSegmentIdMap,
+              carbonTable.isHivePartitionTable,
+              readFileFooterFromCarbonDataFile).collect()
+          }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
new file mode 100644
index 0000000..1acdf7e
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
+  extends Partition {
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * RDD to merge all carbonindex files of each segment to carbonindex file into 
the same segment.
+ * @param sc
+ * @param carbonTable
+ * @param segments segments to be merged
+ */
+class CarbonMergeFilesRDD(
+  sc: SparkContext,
+  carbonTable: CarbonTable,
+  segments: Seq[String],
+  segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+  isHivePartitionedTable: Boolean,
+  readFileFooterFromCarbonDataFile: Boolean)
+  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
+
+  override def getPartitions: Array[Partition] = {
+    segments.zipWithIndex.map {s =>
+      CarbonMergeFilePartition(id, s._2, s._1)
+    }.toArray
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): 
Iterator[String] = {
+    val tablePath = carbonTable.getTablePath
+    val iter = new Iterator[String] {
+      val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
+      logInfo("Merging carbon index files of segment : " +
+              CarbonTablePath.getSegmentPath(tablePath, split.segmentId))
+
+      if (isHivePartitionedTable) {
+        CarbonLoaderUtil
+          .mergeIndexFilesinPartitionedSegment(carbonTable, split.segmentId,
+            segmentFileNameToSegmentIdMap.get(split.segmentId))
+      } else {
+        new CarbonIndexFileMergeWriter(carbonTable)
+          .mergeCarbonIndexFilesOfSegment(split.segmentId,
+            tablePath,
+            readFileFooterFromCarbonDataFile,
+            segmentFileNameToSegmentIdMap.get(split.segmentId))
+      }
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): String = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        ""
+      }
+
+    }
+    iter
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 70c4f12..074568d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -25,6 +25,7 @@ import scala.util.Try
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.events.MergeIndexEventListener
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -39,7 +40,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
 import org.apache.carbondata.events._
-import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, 
LoadTablePreStatusUpdateEvent}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, 
LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -151,7 +152,6 @@ object CarbonEnv {
    */
   def init(sparkSession: SparkSession): Unit = {
     initListeners
-    registerCommonListener(sparkSession)
   }
 
   /**
@@ -181,14 +181,9 @@ object CarbonEnv {
       .addListener(classOf[AlterTableDropPartitionPostStatusEvent],
         AlterTableDropPartitionPostStatusListener)
       .addListener(classOf[AlterTableDropPartitionMetaEvent], 
AlterTableDropPartitionMetaListener)
-  }
-
-  def registerCommonListener(sparkSession: SparkSession): Unit = {
-    val clsName = Try(sparkSession.sparkContext.conf
-      
.get(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME)).toOption.getOrElse("")
-    if (null != clsName && !clsName.isEmpty) {
-      CarbonReflectionUtils.createObject(clsName)
-    }
+      .addListener(classOf[LoadTablePostExecutionEvent], new 
MergeIndexEventListener)
+      .addListener(classOf[AlterTableCompactionPostEvent], new 
MergeIndexEventListener)
+      .addListener(classOf[AlterTableMergeIndexEvent], new 
MergeIndexEventListener)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
new file mode 100644
index 0000000..a58e405
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.events
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.CarbonException
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, 
AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.util.CommonUtil
+
+class MergeIndexEventListener extends OperationEventListener with Logging {
+  val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def onEvent(event: Event, operationContext: OperationContext): Unit 
= {
+    event match {
+      case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
+        LOGGER.audit("Load post status event-listener called for merge index")
+        val loadTablePreStatusUpdateEvent = 
event.asInstanceOf[LoadTablePostExecutionEvent]
+        val carbonTableIdentifier = 
loadTablePreStatusUpdateEvent.getCarbonTableIdentifier
+        val loadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel
+        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
+        val compactedSegments = loadModel.getMergedSegmentIds
+        val sparkSession = SparkSession.getActiveSession.get
+        if(!carbonTable.isStreamingSink) {
+          if (null != compactedSegments && !compactedSegments.isEmpty) {
+            mergeIndexFilesForCompactedSegments(sparkSession.sparkContext,
+              carbonTable,
+              compactedSegments)
+          } else {
+            val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String,
+              String]()
+
+            segmentFileNameMap
+              .put(loadModel.getSegmentId, 
String.valueOf(loadModel.getFactTimeStamp))
+            CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
+              Seq(loadModel.getSegmentId),
+              segmentFileNameMap,
+              carbonTable.getTablePath,
+              carbonTable, false)
+          }
+        }
+      case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
+        LOGGER.audit("Merge index for compaction called")
+        val alterTableCompactionPostEvent = 
event.asInstanceOf[AlterTableCompactionPostEvent]
+        val carbonTable = alterTableCompactionPostEvent.carbonTable
+        val mergedLoads = alterTableCompactionPostEvent.compactedLoads
+        val sparkContext = 
alterTableCompactionPostEvent.sparkSession.sparkContext
+        if(!carbonTable.isStreamingSink) {
+          mergeIndexFilesForCompactedSegments(sparkContext, carbonTable, 
mergedLoads)
+        }
+      case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
+        val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent]
+        val alterTableModel = exceptionEvent.alterTableModel
+        val carbonMainTable = exceptionEvent.carbonTable
+        val compactionType = alterTableModel.compactionType
+        val sparkSession = exceptionEvent.sparkSession
+        if (!carbonMainTable.isStreamingSink) {
+          LOGGER.audit(s"Compaction request received for table " +
+                       s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
+          LOGGER.info(s"Merge Index request received for table " +
+                      s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
+          val lock = CarbonLockFactory.getCarbonLockObj(
+            carbonMainTable.getAbsoluteTableIdentifier,
+            LockUsage.COMPACTION_LOCK)
+
+          try {
+            if (lock.lockWithRetries()) {
+              LOGGER.info("Acquired the compaction lock for table" +
+                          s" ${ carbonMainTable.getDatabaseName }.${
+                            carbonMainTable
+                              .getTableName
+                          }")
+              val validSegments: mutable.Buffer[Segment] = 
CarbonDataMergerUtil.getValidSegmentList(
+                carbonMainTable.getAbsoluteTableIdentifier).asScala
+              val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
+              validSegments.foreach { segment =>
+                validSegmentIds += segment.getSegmentNo
+              }
+              val loadFolderDetailsArray = SegmentStatusManager
+                .readLoadMetadata(carbonMainTable.getMetadataPath)
+              val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String,
+                String]()
+              loadFolderDetailsArray.foreach(loadMetadataDetails => {
+                segmentFileNameMap
+                  .put(loadMetadataDetails.getLoadName, 
loadMetadataDetails.getSegmentFile)
+              })
+              CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
+                validSegmentIds,
+                segmentFileNameMap,
+                carbonMainTable.getTablePath,
+                carbonMainTable,
+                true)
+              val requestMessage = "Compaction request completed for table "
+              s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }"
+              LOGGER.audit(requestMessage)
+              LOGGER.info(requestMessage)
+            } else {
+              val lockMessage = "Not able to acquire the compaction lock for 
table " +
+                                s"${ carbonMainTable.getDatabaseName }.${
+                                  carbonMainTable
+                                    .getTableName
+                                }"
+
+              LOGGER.audit(lockMessage)
+              LOGGER.error(lockMessage)
+              CarbonException.analysisException(
+                "Table is already locked for compaction. Please try after some 
time.")
+            }
+          } finally {
+            lock.unlock()
+          }
+        }
+    }
+  }
+
+  def mergeIndexFilesForCompactedSegments(sparkContext: SparkContext,
+    carbonTable: CarbonTable,
+    mergedLoads: util.List[String]): Unit = {
+    // get only the valid segments of the table
+    val validSegments: mutable.Buffer[Segment] = 
CarbonDataMergerUtil.getValidSegmentList(
+      carbonTable.getAbsoluteTableIdentifier).asScala
+    val mergedSegmentIds = new util.ArrayList[String]()
+    mergedLoads.asScala.foreach(mergedLoad => {
+      val loadName = mergedLoad
+        .substring(mergedLoad.indexOf(CarbonCommonConstants.LOAD_FOLDER) +
+                   CarbonCommonConstants.LOAD_FOLDER.length)
+      mergedSegmentIds.add(loadName)
+    })
+    val loadFolderDetailsArray = SegmentStatusManager
+      .readLoadMetadata(carbonTable.getMetadataPath)
+    val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String, String]()
+    loadFolderDetailsArray.foreach(loadMetadataDetails => {
+      segmentFileNameMap.put(loadMetadataDetails.getLoadName, 
loadMetadataDetails.getSegmentFile)
+    })
+    // filter out only the valid segments from the list of compacted segments
+    // Example: say compacted segments list contains 0.1, 3.1, 6.1, 0.2.
+    // In this list 0.1, 3.1 and 6.1 are compacted to 0.2 in the level 2 
compaction.
+    // So, it is enough to do merge index only for 0.2 as it is the only valid 
segment in this list
+    val validMergedSegIds = validSegments
+      .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) 
}.map(_.getSegmentNo)
+    if (null != validMergedSegIds && !mergedSegmentIds.isEmpty) {
+      CommonUtil.mergeIndexFiles(sparkContext,
+          validMergedSegIds,
+          segmentFileNameMap,
+          carbonTable.getTablePath,
+          carbonTable,
+          false)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index d467017..a4e52c3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -102,23 +102,24 @@ case class CarbonAlterTableCompactionCommand(
     if (SegmentStatusManager.isOverwriteInProgressInTable(table)) {
       throw new ConcurrentOperationException(table, "insert overwrite", 
"compaction")
     }
-    operationContext.setProperty("compactionException", "true")
     var compactionType: CompactionType = null
-    var compactionException = "true"
     try {
       compactionType = 
CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     } catch {
       case _: Exception =>
-        val alterTableCompactionExceptionEvent: AlterTableCompactionAbortEvent 
=
-          AlterTableCompactionAbortEvent(sparkSession, table, alterTableModel)
-        OperationListenerBus.getInstance
-          .fireEvent(alterTableCompactionExceptionEvent, operationContext)
-        compactionException = 
operationContext.getProperty("compactionException").toString
+        throw new MalformedCarbonCommandException(
+          "Unsupported alter operation on carbon table")
     }
-    if (compactionException.equalsIgnoreCase("true") && null == 
compactionType) {
-      throw new MalformedCarbonCommandException(
-        "Unsupported alter operation on carbon table")
-    } else if (compactionException.equalsIgnoreCase("false")) {
+    if (compactionType == CompactionType.SEGMENT_INDEX) {
+      if (table.isStreamingSink) {
+        throw new MalformedCarbonCommandException(
+          "Unsupported alter operation on carbon table: Merge index is not 
supported on streaming" +
+          " table")
+      }
+      val alterTableMergeIndexEvent: AlterTableMergeIndexEvent =
+        AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
+      OperationListenerBus.getInstance
+        .fireEvent(alterTableMergeIndexEvent, operationContext)
       Seq.empty
     } else {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index d30e96d..b341d6a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -67,7 +67,6 @@ object Spark2TestQueryExecutor {
     .enableHiveSupport()
     .config("spark.sql.warehouse.dir", warehouse)
     .config("spark.sql.crossJoin.enabled", "true")
-    .config(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME, 
"")
     .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
   if (warehouse.startsWith("hdfs://")) {
     System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 7cee409..6bfeb06 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -859,22 +859,14 @@ class TestAlterPartitionTable extends QueryTest with 
BeforeAndAfterAll {
 
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[String] 
= {
     val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
-    if (segment.getSegmentFileName != null) {
-      val sfs = new SegmentFileStore(carbonTable.getTablePath, 
segment.getSegmentFileName)
-      sfs.readIndexFiles()
-      val indexFilesMap = sfs.getIndexFilesMap
-      val dataFiles = indexFilesMap.asScala.flatMap(_._2.asScala).map(f => new 
Path(f).getName)
-      dataFiles.toArray
-    } else {
-      val segmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
-      val carbonFile = FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir))
-      val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        override def accept(file: CarbonFile): Boolean = {
-          return file.getName.endsWith(".carbondata")
-        }
-      })
-      dataFiles.map(_.getName)
-    }
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".carbondata")
+      }
+    })
+    dataFiles.map(_.getName)
   }
 
   /**

Reply via email to