fix testcase

clean

clean

clean

fix comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/7b8b1959
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/7b8b1959
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/7b8b1959

Branch: refs/heads/master
Commit: 7b8b19598bf61001b2c4e07debbe26868f64d95c
Parents: b966043
Author: jackylk <jacky.li...@huawei.com>
Authored: Fri Dec 30 14:44:14 2016 +0800
Committer: chenliang613 <chenliang...@apache.org>
Committed: Wed Jan 4 17:11:54 2017 +0800

----------------------------------------------------------------------
 .../scan/executor/impl/DetailQueryExecutor.java |  13 ++-
 .../impl/VectorDetailQueryExecutor.java         |  11 +-
 .../scan/executor/util/QueryUtil.java           |   4 +-
 .../carbondata/scan/filter/FilterUtil.java      |   6 +-
 .../AbstractDetailQueryResultIterator.java      |   2 +-
 .../iterator/DetailQueryResultIterator.java     |   2 +-
 .../VectorDetailQueryResultIterator.java        |   2 +-
 .../MalformedCarbonCommandException.java        |  14 ---
 .../carbondata/spark/load/CarbonLoaderUtil.java |  18 ++-
 .../spark/load/DeleteLoadFolders.java           |  54 +--------
 .../spark/load/DeletedLoadMetadata.java         |  53 ---------
 .../spark/merger/CarbonCompactionExecutor.java  |  35 +-----
 .../spark/merger/CarbonDataMergerUtil.java      |  48 +-------
 .../spark/merger/RowResultMerger.java           |   8 +-
 .../spark/merger/TupleConversionAdapter.java    |   6 +-
 .../spark/partition/api/DataPartitioner.java    |  19 +--
 .../spark/partition/api/Partition.java          |   5 -
 .../api/impl/DataPartitionerProperties.java     |  87 --------------
 .../partition/api/impl/DefaultLoadBalancer.java |   4 -
 .../spark/partition/api/impl/PartitionImpl.java |  54 ---------
 .../api/impl/PartitionMultiFileImpl.java        |   5 -
 .../api/impl/QueryPartitionHelper.java          |   3 +-
 .../api/impl/SampleDataPartitionerImpl.java     | 117 +------------------
 .../carbondata/spark/util/CarbonQueryUtil.java  |  14 +--
 .../apache/carbondata/spark/CarbonFilters.scala | 104 +++++++++++------
 .../spark/rdd/CarbonDataLoadRDD.scala           |  10 +-
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   4 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  36 +++---
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   6 +-
 .../spark/rdd/DataLoadPartitionCoalescer.scala  |   8 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  59 +++++-----
 .../spark/util/GlobalDictionaryUtil.scala       |   5 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  35 +++---
 .../CarbonTableIdentifierImplicit.scala         |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   1 -
 .../spark/sql/CarbonDatasourceRelation.scala    |  10 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   2 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |   3 +-
 .../spark/sql/SparkUnknownExpression.scala      |   9 +-
 .../execution/command/carbonTableSchema.scala   |   9 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  15 ++-
 .../DataCompactionBlockletBoundryTest.scala     |   3 +-
 .../DataCompactionBoundaryConditionsTest.scala  |   1 +
 .../DataCompactionCardinalityBoundryTest.scala  |  12 +-
 .../datacompaction/DataCompactionLockTest.scala |   3 +-
 .../DataCompactionNoDictionaryTest.scala        |   5 +-
 .../datacompaction/DataCompactionTest.scala     |  12 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |   1 +
 .../MajorCompactionStopsAfterCompaction.scala   |   1 +
 .../spark/util/DictionaryTestCaseUtil.scala     |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   1 -
 51 files changed, 248 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
index f2f4b58..7e0f5a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.BatchResult;
 import org.apache.carbondata.scan.result.iterator.DetailQueryResultIterator;
 
 /**
@@ -31,13 +32,17 @@ import 
org.apache.carbondata.scan.result.iterator.DetailQueryResultIterator;
  * For executing the detail query it will pass all the block execution
  * info to detail query result iterator and iterator will be returned
  */
-public class DetailQueryExecutor extends AbstractQueryExecutor {
+public class DetailQueryExecutor extends AbstractQueryExecutor<BatchResult> {
 
-  @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
+  @Override
+  public CarbonIterator<BatchResult> execute(QueryModel queryModel)
       throws QueryExecutionException {
     List<BlockExecutionInfo> blockExecutionInfoList = 
getBlockExecutionInfos(queryModel);
-    return new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
-        queryProperties.executorService);
+    return new DetailQueryResultIterator(
+        blockExecutionInfoList,
+        queryModel,
+        queryProperties.executorService
+    );
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
index 1cbf9c9..884a1eb 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -29,13 +29,16 @@ import 
org.apache.carbondata.scan.result.iterator.VectorDetailQueryResultIterato
 /**
  * Below class will be used to execute the detail query and returns columnar 
vectors.
  */
-public class VectorDetailQueryExecutor extends AbstractQueryExecutor {
+public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
 
-  @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
+  @Override public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException {
     List<BlockExecutionInfo> blockExecutionInfoList = 
getBlockExecutionInfos(queryModel);
-    return new VectorDetailQueryResultIterator(blockExecutionInfoList, 
queryModel,
-        queryProperties.executorService);
+    return new VectorDetailQueryResultIterator(
+        blockExecutionInfoList,
+        queryModel,
+        queryProperties.executorService
+    );
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java 
b/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
index 3f9ebb5..9fdb9ae 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
@@ -336,9 +336,9 @@ public class QueryUtil {
         getDictionaryColumnUniqueIdentifierList(dictionaryColumnIdList,
             absoluteTableIdentifier.getCarbonTableIdentifier());
     CacheProvider cacheProvider = CacheProvider.getInstance();
-    Cache forwardDictionaryCache = cacheProvider
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache 
= cacheProvider
         .createCache(CacheType.FORWARD_DICTIONARY, 
absoluteTableIdentifier.getStorePath());
-    List<Dictionary> columnDictionaryList = null;
+    List<Dictionary> columnDictionaryList;
     try {
       columnDictionaryList = 
forwardDictionaryCache.getAll(dictionaryColumnUniqueIdentifiers);
     } catch (CarbonUtilException e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
index c1985bc..943ea1a 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/FilterUtil.java
@@ -951,12 +951,12 @@ public final class FilterUtil {
         new 
DictionaryColumnUniqueIdentifier(tableIdentifier.getCarbonTableIdentifier(),
             carbonDimension.getColumnIdentifier(), 
carbonDimension.getDataType());
     CacheProvider cacheProvider = CacheProvider.getInstance();
-    Cache forwardDictionaryCache =
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache 
=
         cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, 
tableIdentifier.getStorePath());
     // get the forward dictionary object
-    Dictionary forwardDictionary = null;
+    Dictionary forwardDictionary;
     try {
-      forwardDictionary = (Dictionary) 
forwardDictionaryCache.get(dictionaryColumnUniqueIdentifier);
+      forwardDictionary = 
forwardDictionaryCache.get(dictionaryColumnUniqueIdentifier);
     } catch (CarbonUtilException e) {
       throw new QueryExecutionException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index ff7a93c..f6b6ba1 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -46,7 +46,7 @@ import 
org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
  * executing that query are returning a iterator over block and every time next
  * call will come it will execute the block and return the result
  */
-public abstract class AbstractDetailQueryResultIterator extends CarbonIterator 
{
+public abstract class AbstractDetailQueryResultIterator<E> extends 
CarbonIterator<E> {
 
   /**
    * LOGGER.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
index a958195..7d944d6 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.scan.result.BatchResult;
  * executing that query are returning a iterator over block and every time next
  * call will come it will execute the block and return the result
  */
-public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator {
+public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator<BatchResult> {
 
   private final Object lock = new Object();
   private Future<BatchResult> future;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
index 8d0e80c..417f597 100644
--- 
a/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -28,7 +28,7 @@ import 
org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
 /**
  * It reads the data vector batch format
  */
-public class VectorDetailQueryResultIterator extends 
AbstractDetailQueryResultIterator {
+public class VectorDetailQueryResultIterator extends 
AbstractDetailQueryResultIterator<Object> {
 
   private final Object lock = new Object();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
index de7d4a2..e97d063 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
@@ -19,9 +19,6 @@
 
 package org.apache.carbondata.spark.exception;
 
-
-import java.util.Locale;
-
 public class MalformedCarbonCommandException extends Exception {
 
 
@@ -56,17 +53,6 @@ public class MalformedCarbonCommandException extends 
Exception {
   }
 
   /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
    * getLocalizedMessage
    */
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index eca34a6..419e833 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -241,7 +241,7 @@ public final class CarbonLoaderUtil {
     }
   }
 
-  public static void deleteStorePath(String path) {
+  private static void deleteStorePath(String path) {
     try {
       FileType fileType = FileFactory.getFileType(path);
       if (FileFactory.isFileExist(path, fileType)) {
@@ -311,7 +311,7 @@ public final class CarbonLoaderUtil {
     String localStoreLocation = CarbonProperties.getInstance()
         .getProperty(tempLocationKey, 
CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
     try {
-      CarbonUtil.deleteFoldersAndFiles(new File[] { new 
File(localStoreLocation).getParentFile() });
+      CarbonUtil.deleteFoldersAndFiles(new 
File(localStoreLocation).getParentFile());
       LOGGER.info("Deleted the local store location" + localStoreLocation);
     } catch (CarbonUtilException e) {
       LOGGER.error(e, "Failed to delete local data load folder location");
@@ -325,15 +325,13 @@ public final class CarbonLoaderUtil {
    * @param storePath
    * @param carbonTableIdentifier
    * @param segmentId
-   * @param partitionId
    * @return
    */
   public static String getStoreLocation(String storePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId, String 
partitionId) {
+      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
     CarbonTablePath carbonTablePath =
         CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
-    String carbonDataFilePath = 
carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
-    return carbonDataFilePath;
+    return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
   }
 
   /**
@@ -385,9 +383,7 @@ public final class CarbonLoaderUtil {
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
         if (null != listOfLoadFolderDetailsArray) {
-          for (LoadMetadataDetails loadMetadata : 
listOfLoadFolderDetailsArray) {
-            listOfLoadFolderDetails.add(loadMetadata);
-          }
+          Collections.addAll(listOfLoadFolderDetails, 
listOfLoadFolderDetailsArray);
         }
         listOfLoadFolderDetails.add(loadMetadataDetails);
 
@@ -463,9 +459,9 @@ public final class CarbonLoaderUtil {
 
   public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier 
columnIdentifier,
       String carbonStorePath) throws CarbonUtilException {
-    Cache dictCache =
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
         CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, 
carbonStorePath);
-    return (Dictionary) dictCache.get(columnIdentifier);
+    return dictCache.get(columnIdentifier);
   }
 
   public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index 0663abc..9f092e7 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -26,6 +26,7 @@
  * Description   : for physical deletion of load folders.
  * Class Version  : 1.0
  */
+
 package org.apache.carbondata.spark.load;
 
 import java.io.IOException;
@@ -158,59 +159,6 @@ public final class DeleteLoadFolders {
     return false;
   }
 
-  private static void factFileRenaming(String loadFolderPath) {
-
-    FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
-    try {
-      if (FileFactory.isFileExist(loadFolderPath, fileType)) {
-        CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, 
fileType);
-
-        CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return (file.getName().endsWith('_' + 
CarbonCommonConstants.FACT_FILE_UPDATED));
-          }
-        });
-
-        for (CarbonFile file : listFiles) {
-          if (!file.renameTo(file.getName().substring(0,
-              file.getName().length() - 
CarbonCommonConstants.FACT_FILE_UPDATED.length()))) {
-            LOGGER.warn("could not rename the updated fact file.");
-          }
-        }
-
-      }
-    } catch (IOException e) {
-      LOGGER.error("exception" + e.getMessage());
-    }
-
-  }
-
-  private static void cleanDeletedFactFile(String loadFolderPath) {
-    FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
-    try {
-      if (FileFactory.isFileExist(loadFolderPath, fileType)) {
-        CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, 
fileType);
-
-        CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return 
(file.getName().endsWith(CarbonCommonConstants.FACT_DELETE_EXTENSION));
-          }
-        });
-
-        for (CarbonFile file : listFiles) {
-          if (!file.delete()) {
-            LOGGER.warn("could not delete the marked fact file.");
-          }
-        }
-
-      }
-    } catch (IOException e) {
-      LOGGER.error("exception" + e.getMessage());
-    }
-  }
-
   public static boolean deleteLoadFoldersFromFileSystem(String dbName, String 
tableName,
       String storeLocation, boolean isForceDelete, LoadMetadataDetails[] 
details) {
     List<LoadMetadataDetails> deletedLoads =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
deleted file mode 100644
index 661e17c..0000000
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.load;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class DeletedLoadMetadata implements Serializable {
-
-  private static final long serialVersionUID = 7083059404172117208L;
-  private Map<String, String> deletedLoadMetadataMap =
-      new HashMap<String, 
String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  public void addDeletedLoadMetadata(String loadId, String status) {
-    deletedLoadMetadataMap.put(loadId, status);
-  }
-
-  public List<String> getDeletedLoadMetadataIds() {
-    return new ArrayList<String>(deletedLoadMetadataMap.keySet());
-  }
-
-  public String getDeletedLoadMetadataStatus(String loadId) {
-    if (deletedLoadMetadataMap.containsKey(loadId)) {
-      return deletedLoadMetadataMap.get(loadId);
-    } else {
-      return null;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
index dc5fb17..ef3d2e4 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -56,41 +56,23 @@ public class CarbonCompactionExecutor {
       
LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
   private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
   private final SegmentProperties destinationSegProperties;
-  private final String databaseName;
-  private final String factTableName;
   private final Map<String, TaskBlockInfo> segmentMapping;
-  private final String storePath;
   private QueryExecutor queryExecutor;
   private CarbonTable carbonTable;
   private QueryModel queryModel;
 
   /**
    * Constructor
-   *
-   * @param segmentMapping
+   *  @param segmentMapping
    * @param segmentProperties
-   * @param databaseName
-   * @param factTableName
-   * @param storePath
    * @param carbonTable
    */
   public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
-      SegmentProperties segmentProperties, String databaseName, String 
factTableName,
-      String storePath, CarbonTable carbonTable,
+      SegmentProperties segmentProperties, CarbonTable carbonTable,
       Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) {
-
     this.segmentMapping = segmentMapping;
-
     this.destinationSegProperties = segmentProperties;
-
-    this.databaseName = databaseName;
-
-    this.factTableName = factTableName;
-
-    this.storePath = storePath;
-
     this.carbonTable = carbonTable;
-
     this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
   }
 
@@ -143,18 +125,9 @@ public class CarbonCompactionExecutor {
    */
   private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> 
blockList)
       throws QueryExecutionException {
-
     queryModel.setTableBlockInfos(blockList);
     this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
-    CarbonIterator<BatchResult> iter = null;
-    try {
-      iter = queryExecutor.execute(queryModel);
-    } catch (QueryExecutionException e) {
-      LOGGER.error(e.getMessage());
-      throw e;
-    }
-
-    return iter;
+    return queryExecutor.execute(queryModel);
   }
 
   /**
@@ -191,7 +164,7 @@ public class CarbonCompactionExecutor {
    * @param blockList
    * @return
    */
-  public QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+  private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
 
     QueryModel model = new QueryModel();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 836a757..41709f7 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -28,15 +28,12 @@ import java.util.Calendar;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -161,7 +158,7 @@ public final class CarbonDataMergerUtil {
                 .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
               LOGGER.error("Compaction is aborted as the segment " + 
loadDetail.getLoadName()
                   + " is deleted after the compaction is started.");
-              return tableStatusUpdationStatus;
+              return false;
             }
             loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
             
loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
@@ -225,7 +222,7 @@ public final class CarbonDataMergerUtil {
       CarbonLoadModel carbonLoadModel, long compactionSize,
       List<LoadMetadataDetails> segments, CompactionType compactionType) {
 
-    List sortedSegments = new ArrayList(segments);
+    List<LoadMetadataDetails> sortedSegments = new 
ArrayList<LoadMetadataDetails>(segments);
 
     sortSegments(sortedSegments);
 
@@ -466,7 +463,7 @@ public final class CarbonDataMergerUtil {
   private static long getSizeOfSegment(String storeLocation,
       CarbonTableIdentifier tableIdentifier, String segId) {
     String loadPath = CarbonLoaderUtil
-        .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
+        .getStoreLocation(storeLocation, tableIdentifier, segId);
     CarbonFile segmentFolder =
         FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
     return getSizeOfFactFileInLoad(segmentFolder);
@@ -557,11 +554,9 @@ public final class CarbonDataMergerUtil {
    */
   private static List<LoadMetadataDetails> 
checkPreserveSegmentsPropertyReturnRemaining(
       List<LoadMetadataDetails> segments) {
-
-    int numberOfSegmentsToBePreserved = 0;
     // check whether the preserving of the segments from merging is enabled or 
not.
     // get the number of loads to be preserved.
-    numberOfSegmentsToBePreserved =
+    int numberOfSegmentsToBePreserved =
         CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
     // get the number of valid segments and retain the latest loads from 
merging.
     return CarbonDataMergerUtil
@@ -635,9 +630,9 @@ public final class CarbonDataMergerUtil {
     for (LoadMetadataDetails segment : loadMetadataDetails) {
       //check if this load is an already merged load.
       if (null != segment.getMergedLoadName()) {
-        builder.append(segment.getMergedLoadName() + ",");
+        builder.append(segment.getMergedLoadName()).append(",");
       } else {
-        builder.append(segment.getLoadName() + ",");
+        builder.append(segment.getLoadName()).append(",");
       }
     }
     builder.deleteCharAt(builder.length() - 1);
@@ -645,37 +640,6 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
-   * Combining the list of maps to one map.
-   *
-   * @param mapsOfNodeBlockMapping
-   * @return
-   */
-  public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(
-      List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) {
-
-    Map<String, List<TableBlockInfo>> combinedMap =
-        new HashMap<String, 
List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // traverse list of maps.
-    for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) {
-      // traverse inside each map.
-      for (Map.Entry<String, List<TableBlockInfo>> eachEntry : 
eachMap.entrySet()) {
-
-        String node = eachEntry.getKey();
-        List<TableBlockInfo> blocks = eachEntry.getValue();
-
-        // if already that node detail exist in the combined map.
-        if (null != combinedMap.get(node)) {
-          List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node);
-          blocksAlreadyPresent.addAll(blocks);
-        } else { // if its not present in map then put to map.
-          combinedMap.put(node, blocks);
-        }
-      }
-    }
-    return combinedMap;
-  }
-
-  /**
    * Removing the already merged segments from list.
    */
   public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
index 44c05a1..5c534fa 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -63,7 +63,6 @@ public class RowResultMerger {
   private final String databaseName;
   private final String tableName;
   private final String tempStoreLocation;
-  private final int measureCount;
   private final String factStoreLocation;
   private CarbonFactHandler dataHandler;
   private List<RawResultIterator> rawResultIteratorList =
@@ -101,7 +100,7 @@ public class RowResultMerger {
     this.databaseName = databaseName;
     this.tableName = tableName;
 
-    this.measureCount = segprop.getMeasures().size();
+    int measureCount = segprop.getMeasures().size();
     CarbonTable carbonTable = CarbonMetadata.getInstance()
             .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + 
tableName);
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
@@ -195,7 +194,7 @@ public class RowResultMerger {
    *
    * @throws SliceMergerException
    */
-  protected void addRow(Object[] carbonTuple) throws SliceMergerException {
+  private void addRow(Object[] carbonTuple) throws SliceMergerException {
     Object[] rowInWritableFormat;
 
     rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
@@ -270,12 +269,11 @@ public class RowResultMerger {
    */
   private String checkAndCreateCarbonStoreLocation(String factStoreLocation, 
String databaseName,
       String tableName, String partitionId, String segmentId) {
-    String carbonStorePath = factStoreLocation;
     CarbonTable carbonTable = CarbonMetadata.getInstance()
         .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + 
tableName);
     CarbonTableIdentifier carbonTableIdentifier = 
carbonTable.getCarbonTableIdentifier();
     CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, 
carbonTableIdentifier);
+        CarbonStorePath.getCarbonTablePath(factStoreLocation, 
carbonTableIdentifier);
     String carbonDataDirectoryPath =
         carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
index 94ebf40..35b8164 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
@@ -22,19 +22,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
 import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
 
 /**
  * This class will be used to convert the Result into the format used in data 
writer.
  */
-public class TupleConversionAdapter {
+class TupleConversionAdapter {
 
   private final SegmentProperties segmentproperties;
 
-  private final List<CarbonMeasure> measureList;
-
   private int noDictionaryPresentIndex;
 
   private int measureCount;
@@ -48,7 +45,6 @@ public class TupleConversionAdapter {
       noDictionaryPresentIndex++;
     }
     this.segmentproperties = segmentProperties;
-    measureList = segmentProperties.getMeasures();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
index 58f3a2d..f097b54 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
@@ -21,15 +21,7 @@ package org.apache.carbondata.spark.partition.api;
 
 import java.util.List;
 
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-
-import org.apache.spark.sql.execution.command.Partitioner;
-
 public interface DataPartitioner {
-  /**
-   * Initialise the partitioner based on the given columns
-   */
-  void initialize(String basePath, String[] columns, Partitioner partitioner);
 
   /**
    * All the partitions built by the Partitioner
@@ -37,18 +29,9 @@ public interface DataPartitioner {
   List<Partition> getAllPartitions();
 
   /**
-   * Partition where the tuple should be present. (API used for data loading 
purpose)
-   */
-  Partition getPartionForTuple(Object[] tuple, long rowCounter);
-
-  /**
    * Identifies the partitions applicable for the given filter (API used for 
For query)
    */
-  List<Partition> getPartitions(CarbonQueryPlan queryPlan);
-
-  String[] getPartitionedColumns();
-
-  Partitioner getPartitioner();
+  List<Partition> getPartitions();
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
index 61639d3..ca6e7bd 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
@@ -29,11 +29,6 @@ public interface Partition extends Serializable {
   String getUniqueID();
 
   /**
-   * File path for the raw data represented by this partition
-   */
-  String getFilePath();
-
-  /**
    * result
    *
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
deleted file mode 100644
index bc6e54f..0000000
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DataPartitionerProperties {
-  private static final LogService LOGGER =
-      
LogServiceFactory.getLogService(DataPartitionerProperties.class.getName());
-
-  private static DataPartitionerProperties instance;
-
-  private Properties properties;
-
-  private DataPartitionerProperties() {
-    properties = loadProperties();
-  }
-
-  public static DataPartitionerProperties getInstance() {
-    if (instance == null) {
-      instance = new DataPartitionerProperties();
-    }
-    return instance;
-  }
-
-  public String getValue(String key, String defaultVal) {
-    return properties.getProperty(key, defaultVal);
-  }
-
-  public String getValue(String key) {
-    return properties.getProperty(key);
-  }
-
-  /**
-   * Read the properties from CSVFilePartitioner.properties
-   */
-  private Properties loadProperties() {
-    Properties props = new Properties();
-
-    File file = new File("DataPartitioner.properties");
-    FileInputStream fis = null;
-    try {
-      if (file.exists()) {
-        fis = new FileInputStream(file);
-
-        props.load(fis);
-      }
-    } catch (Exception e) {
-      LOGGER
-          .error(e, e.getMessage());
-    } finally {
-      if (null != fis) {
-        try {
-          fis.close();
-        } catch (IOException e) {
-          LOGGER.error(e,
-              e.getMessage());
-        }
-      }
-    }
-
-    return props;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
index 9bee8a2..6594c79 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
@@ -59,10 +59,6 @@ public class DefaultLoadBalancer {
     }
   }
 
-  public List<Partition> getPartitionsForNode(String node) {
-    return nodeToPartitonMap.get(node);
-  }
-
   public String getNodeForPartitions(Partition partition) {
     return partitonToNodeMap.get(partition);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
deleted file mode 100644
index bd7cc42..0000000
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.Partition;
-
-public class PartitionImpl implements Partition {
-  private static final long serialVersionUID = 3020172346383028547L;
-  private String uniqueID;
-  private String folderPath;
-
-
-  public PartitionImpl(String uniqueID, String folderPath) {
-    this.uniqueID = uniqueID;
-    this.folderPath = folderPath;
-  }
-
-  @Override public String getUniqueID() {
-    return uniqueID;
-  }
-
-  @Override public String getFilePath() {
-    return folderPath;
-  }
-
-  @Override public String toString() {
-    return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
-  }
-
-  @Override public List<String> getFilesPath() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
index de32b5c..f34e9c9 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
@@ -38,11 +38,6 @@ public class PartitionMultiFileImpl implements Partition {
     return uniqueID;
   }
 
-  @Override public String getFilePath() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   @Override public List<String> getFilesPath() {
     // TODO Auto-generated method stub
     return folderPath;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
index e05be7d..a864ebf 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
@@ -52,8 +52,7 @@ public final class QueryPartitionHelper {
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 
-    List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan);
-    return queryPartitions;
+    return dataPartitioner.getPartitions();
   }
 
   public List<Partition> getAllPartitions(String databaseName, String 
tableName) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
index c9b434a..21132ab 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
@@ -19,133 +19,24 @@
 
 package org.apache.carbondata.spark.partition.api.impl;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.spark.partition.api.DataPartitioner;
 import org.apache.carbondata.spark.partition.api.Partition;
 
-import org.apache.spark.sql.execution.command.Partitioner;
-
 /**
  * Sample partition.
  */
 public class SampleDataPartitionerImpl implements DataPartitioner {
-  private static final LogService LOGGER =
-      
LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName());
-  private int numberOfPartitions = 1;
-
-  private int partionColumnIndex = -1;
-
-  private String partitionColumn;
-
-  private Partitioner partitioner;
-  private List<Partition> allPartitions;
-  private String baseLocation;
-
-  public SampleDataPartitionerImpl() {
-  }
-
-  public void initialize(String basePath, String[] columns, Partitioner 
partitioner) {
-    this.partitioner = partitioner;
-    numberOfPartitions = partitioner.partitionCount();
-
-    partitionColumn = partitioner.partitionColumn()[0];
-    LOGGER.info("SampleDataPartitionerImpl initializing with following 
properties.");
-    LOGGER.info("partitionCount: " + numberOfPartitions);
-    LOGGER.info("partitionColumn: " + partitionColumn);
-    LOGGER.info("basePath: " + basePath);
-    LOGGER.info("columns: " + Arrays.toString(columns));
-
-    this.baseLocation = basePath;
-    allPartitions = new 
ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-    for (int i = 0; i < columns.length; i++) {
-      if (columns[i].equalsIgnoreCase(partitionColumn)) {
-        partionColumnIndex = i;
-        break;
-      }
-    }
-
-    for (int partionCounter = 0; partionCounter < numberOfPartitions; 
partionCounter++) {
-      PartitionImpl partitionImpl =
-          new PartitionImpl("" + partionCounter, baseLocation + '/' + 
partionCounter);
-
-      List<Object> includedHashes = new 
ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      includedHashes.add(partionCounter);
-
-      allPartitions.add(partitionImpl);
-    }
-  }
-
-  @Override public Partition getPartionForTuple(Object[] tuple, long 
rowCounter) {
-    int hashCode;
-    if (partionColumnIndex == -1) {
-      hashCode = hashCode(rowCounter);
-    } else {
-      try {
-        hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode());
-      } catch (NumberFormatException e) {
-        hashCode = hashCode(0);
-      }
-    }
-    return allPartitions.get(hashCode);
-  }
 
-  /**
-   *
-   */
+  @Override
   public List<Partition> getAllPartitions() {
-    return allPartitions;
+    return null;
   }
 
-  /**
-   * @see DataPartitioner#getPartitions(CarbonQueryPlan)
-   */
-  public List<Partition> getPartitions(CarbonQueryPlan queryPlan) {
-    // TODO: this has to be redone during partitioning implmentatation
-    return allPartitions;
-  }
-
-  /**
-   * Identify the partitions applicable for the given filter
-   */
+  @Override
   public List<Partition> getPartitions() {
-    return allPartitions;
-
-    // TODO: this has to be redone during partitioning implementation
-    //    for (Partition aPartition : allPartitions) {
-    //      CarbonDimensionLevelFilter partitionFilterDetails =
-    //          aPartition.getPartitionDetails().get(partitionColumn);
-    //
-    //      //Check if the partition is serving any of the
-    //      //hash code generated for include filter of query
-    //      for (Object includeFilter : msisdnFilter.getIncludeFilter()) {
-    //        int hashCode = hashCode(((String) includeFilter).hashCode());
-    //        if 
(partitionFilterDetails.getIncludeFilter().contains(hashCode)) {
-    //          allowedPartitions.add(aPartition);
-    //          break;
-    //        }
-    //      }
-    //    }
-
-  }
-
-  private int hashCode(long key) {
-    return (int) (Math.abs(key) % numberOfPartitions);
-  }
-
-  @Override public String[] getPartitionedColumns() {
-    return new String[] { partitionColumn };
-  }
-
-  @Override public Partitioner getPartitioner() {
-    return partitioner;
+    return null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
index 9d1a281..94e475e 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -19,8 +19,8 @@
 
 package org.apache.carbondata.spark.util;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +49,7 @@ public class CarbonQueryUtil {
    * It creates the one split for each region server.
    */
   public static synchronized TableSplit[] getTableSplits(String databaseName, 
String tableName,
-      CarbonQueryPlan queryPlan) throws IOException {
+      CarbonQueryPlan queryPlan) {
 
     //Just create splits depends on locations of region servers
     List<Partition> allPartitions = null;
@@ -78,7 +78,7 @@ public class CarbonQueryUtil {
   /**
    * It creates the one split for each region server.
    */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) 
throws Exception {
+  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) {
 
     //Just create splits depends on locations of region servers
     DefaultLoadBalancer loadBalancer = null;
@@ -104,13 +104,11 @@ public class CarbonQueryUtil {
       String separator) {
     if (StringUtils.isNotEmpty(sourcePath)) {
       String[] files = sourcePath.split(separator);
-      for (String file : files) {
-        partitionsFiles.add(file);
-      }
+      Collections.addAll(partitionsFiles, files);
     }
   }
 
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath) 
throws Exception {
+  private static List<Partition> getAllFilesForDataLoad(String sourcePath) {
     List<String> files = new 
ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
     List<Partition> partitionList =
@@ -121,7 +119,7 @@ public class CarbonQueryUtil {
     partitionList.add(new PartitionMultiFileImpl(0 + "", 
partitionFiles.get(0)));
 
     for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(i % 1).add(files.get(i));
+      partitionFiles.get(0).add(files.get(i));
     }
     return partitionList;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 5e58235..6115519 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -151,31 +151,30 @@ object CarbonFilters {
           Some(sources.EqualTo(a.name, v))
         case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
           Some(sources.EqualTo(a.name, v))
-
-        case Not(EqualTo(a: Attribute, Literal(v, t))) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), a: Attribute)) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
-        case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+        case Not(EqualTo(a: Attribute, Literal(v, t))) =>
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Literal(v, t), a: Attribute)) =>
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+        case IsNotNull(a: Attribute) =>
+          Some(sources.IsNotNull(a.name))
+        case IsNull(a: Attribute) =>
+          Some(sources.IsNull(a.name))
         case Not(In(a: Attribute, list)) if 
!list.exists(!_.isInstanceOf[Literal]) =>
           val hSet = list.map(e => e.eval(EmptyRow))
           Some(sources.Not(sources.In(a.name, hSet.toArray)))
         case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) 
=>
           val hSet = list.map(e => e.eval(EmptyRow))
           Some(sources.In(a.name, hSet.toArray))
-        case Not(In(Cast(a: Attribute, _), list))
-          if !list.exists(!_.isInstanceOf[Literal]) =>
+        case Not(In(Cast(a: Attribute, _), list)) if 
!list.exists(!_.isInstanceOf[Literal]) =>
           val hSet = list.map(e => e.eval(EmptyRow))
           Some(sources.Not(sources.In(a.name, hSet.toArray)))
         case In(Cast(a: Attribute, _), list) if 
!list.exists(!_.isInstanceOf[Literal]) =>
           val hSet = list.map(e => e.eval(EmptyRow))
           Some(sources.In(a.name, hSet.toArray))
-
         case GreaterThan(a: Attribute, Literal(v, t)) =>
           Some(sources.GreaterThan(a.name, v))
         case GreaterThan(Literal(v, t), a: Attribute) =>
@@ -184,7 +183,6 @@ object CarbonFilters {
           Some(sources.GreaterThan(a.name, v))
         case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
           Some(sources.LessThan(a.name, v))
-
         case LessThan(a: Attribute, Literal(v, t)) =>
           Some(sources.LessThan(a.name, v))
         case LessThan(Literal(v, t), a: Attribute) =>
@@ -193,7 +191,6 @@ object CarbonFilters {
           Some(sources.LessThan(a.name, v))
         case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
           Some(sources.GreaterThan(a.name, v))
-
         case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
           Some(sources.GreaterThanOrEqual(a.name, v))
         case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
@@ -202,7 +199,6 @@ object CarbonFilters {
           Some(sources.GreaterThanOrEqual(a.name, v))
         case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
           Some(sources.LessThanOrEqual(a.name, v))
-
         case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
           Some(sources.LessThanOrEqual(a.name, v))
         case LessThanOrEqual(Literal(v, t), a: Attribute) =>
@@ -248,23 +244,63 @@ object CarbonFilters {
           (transformExpression(left) ++ 
transformExpression(right)).reduceOption(new
               AndExpression(_, _))
 
-        case EqualTo(a: Attribute, l@Literal(v, t)) => new
-            Some(new EqualToExpression(transformExpression(a).get, 
transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), a: Attribute) => new
-            Some(new EqualToExpression(transformExpression(a).get, 
transformExpression(l).get))
-        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
-            Some(new EqualToExpression(transformExpression(a).get, 
transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
-            Some(new EqualToExpression(transformExpression(a).get, 
transformExpression(l).get))
-
-        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, 
transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, 
transformExpression(l).get))
-        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, 
transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, 
transformExpression(l).get))
+        case EqualTo(a: Attribute, l@Literal(v, t)) =>
+          Some(
+            new EqualToExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+        case EqualTo(l@Literal(v, t), a: Attribute) =>
+          Some(
+            new EqualToExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(
+            new EqualToExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(
+            new EqualToExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) =>
+          Some(
+            new NotEqualsExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) =>
+          Some(
+            new NotEqualsExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) =>
+          Some(
+            new NotEqualsExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
+        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) =>
+          Some(
+            new NotEqualsExpression(
+              transformExpression(a).get,
+              transformExpression(l).get
+            )
+          )
         case IsNotNull(child: Attribute) =>
             Some(new NotEqualsExpression(transformExpression(child).get,
              transformExpression(Literal(null)).get, true))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 5d6a663..57087f3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -231,8 +231,8 @@ class DataFileLoaderRDD[K, V](
       var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       var model: CarbonLoadModel = _
-      var uniqueLoadStatusId = carbonLoadModel.getTableName + 
CarbonCommonConstants.UNDERSCORE +
-                               theSplit.index
+      val uniqueLoadStatusId = carbonLoadModel.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+          theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
@@ -500,10 +500,10 @@ class DataFrameLoaderRDD[K, V](
   override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val resultIter = new Iterator[(K, V)] {
-      var partitionID = "0"
+      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
-      var uniqueLoadStatusId = carbonLoadModel.getTableName + 
CarbonCommonConstants.UNDERSCORE +
-                               theSplit.index
+      val uniqueLoadStatusId = carbonLoadModel.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+          theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
         carbonLoadModel.setPartitionId(partitionID)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 0534def..33f453b 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -25,7 +25,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.spark.DeletedLoadResult
-import org.apache.carbondata.spark.load.DeletedLoadMetadata
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
 class CarbonDeleteLoadByDateRDD[K, V](
@@ -53,7 +52,6 @@ class CarbonDeleteLoadByDateRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
     new Iterator[(K, V)] {
-      val deletedMetaData = new DeletedLoadMetadata()
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
 
@@ -69,7 +67,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
         case e: Exception => logInfo("Unable to parse with default time format 
" + dateValue)
       }
       // TODO: Implement it
-      var finished = false
+      val finished = false
 
       override def hasNext: Boolean = {
         finished

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 3c15818..9d9fa99 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.ArrayBuffer
 import scala.util.control.Breaks.{break, breakable}
 
 import au.com.bytecode.opencsv.CSVReader
@@ -34,17 +34,19 @@ import org.apache.spark.sql.Row
 
 import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.Dictionary
 import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, 
ColumnIdentifier}
 import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.service.PathService
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, 
SortIndexWriterTask}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 /**
  * A partitioner partition by column.
@@ -70,9 +72,9 @@ case class DictionaryStats(distinctValues: 
java.util.List[String],
     dictWriteTime: Long, sortIndexWriteTime: Long)
 
 case class PrimitiveParser(dimension: CarbonDimension,
-    setOpt: Option[HashSet[String]]) extends GenericParser {
-  val (hasDictEncoding, set: HashSet[String]) = setOpt match {
-    case None => (false, new HashSet[String])
+    setOpt: Option[mutable.HashSet[String]]) extends GenericParser {
+  val (hasDictEncoding, set: mutable.HashSet[String]) = setOpt match {
+    case None => (false, new mutable.HashSet[String])
     case Some(x) => (true, x)
   }
 
@@ -183,7 +185,7 @@ class CarbonAllDictionaryCombineRDD(
   ): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
-    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+    val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     /*
      * for all dictionary, all columns need to encoding and checking
      * isHighCardinalityColumn, so no need to calculate rowcount
@@ -194,7 +196,7 @@ class CarbonAllDictionaryCombineRDD(
         GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
       val dimNum = model.dimensions.length
       // Map[dimColName -> dimColNameIndex]
-      val columnIndexMap = new HashMap[String, Int]()
+      val columnIndexMap = new mutable.HashMap[String, Int]()
       for (j <- 0 until dimNum) {
         columnIndexMap.put(model.dimensions(j).getColName, j)
       }
@@ -245,7 +247,7 @@ class CarbonBlockDistinctValuesCombineRDD(
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
-    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+    val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     var rowCount = 0L
     try {
       val dimensionParsers =
@@ -297,15 +299,15 @@ class CarbonGlobalDictionaryGenerateRDD(
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
String, Boolean)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     var isHighCardinalityColumn = false
     val iter = new Iterator[(Int, String, Boolean)] {
-      var dictionaryForDistinctValueLookUp:
-      org.apache.carbondata.core.cache.dictionary.Dictionary = _
-      var dictionaryForSortIndexWriting: 
org.apache.carbondata.core.cache.dictionary.Dictionary = _
+      var dictionaryForDistinctValueLookUp: Dictionary = _
+      var dictionaryForSortIndexWriting: Dictionary = _
       var dictionaryForDistinctValueLookUpCleared: Boolean = false
-      val pathService = CarbonCommonFactory.getPathService
-      val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, 
model.table)
+      val pathService: PathService = CarbonCommonFactory.getPathService
+      val carbonTablePath: CarbonTablePath =
+        pathService.getCarbonTablePath(model.hdfsLocation, model.table)
       if (StringUtils.isNotBlank(model.hdfsTempLocation )) {
          
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
            model.hdfsTempLocation)
@@ -538,8 +540,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: 
CarbonLoadModel,
         }
       }
     }
-    val mapIdWithSet = new HashMap[String, HashSet[String]]
-    val columnValues = new HashSet[String]
+    val mapIdWithSet = new mutable.HashMap[String, mutable.HashSet[String]]
+    val columnValues = new mutable.HashSet[String]
     val distinctValues = (theSplit.index, columnValues)
     mapIdWithSet.put(primDimension.getColumnId, columnValues)
     // use parser to generate new dict value

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index e740caa..94fa601 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -123,10 +123,8 @@ class CarbonMergerRDD[K, V](
 
         carbonLoadModel.setStorePath(storePath)
 
-        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, 
databaseName,
-          factTableName, storePath, 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
-          dataFileMetadataSegMapping
-        )
+        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
+          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, 
dataFileMetadataSegMapping)
 
         // fire a query and get the results.
         var result2: util.List[RawResultIterator] = null

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
index 77402b4..02718e0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -71,7 +71,7 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: 
Array[String]) {
 
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   val prevPartitions = prev.partitions
-  var numOfParts = Math.max(1, Math.min(nodeList.length, 
prevPartitions.length))
+  val numOfParts = Math.max(1, Math.min(nodeList.length, 
prevPartitions.length))
   // host => partition id list
   val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
   // partition id => host list
@@ -268,7 +268,7 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: 
Array[String]) {
       prevPartIndexs(i % numOfParts) += prevPartitions(i).index
     }
     prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
-      new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
+      CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
     }
   }
 
@@ -312,8 +312,8 @@ class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: 
Array[String]) {
       } else {
         Some(emptyHosts(index - localityResult.length))
       }
-      LOGGER.info(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
-      new CoalescedRDDPartition(index, prev, ids, loc)
+      LOGGER.info(s"CoalescedRDDPartition $index, ${ids.length}, $loc ")
+      CoalescedRDDPartition(index, prev, ids, loc)
     }.filter(_.parentsIndices.nonEmpty).toArray
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 64b8b61..22d8406 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -144,7 +144,7 @@ class NewCarbonDataLoadRDD[K, V](
       var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       var model: CarbonLoadModel = _
-      var uniqueLoadStatusId =
+      val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
@@ -279,38 +279,35 @@ class NewCarbonDataLoadRDD[K, V](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    isTableSplitPartition match {
-      case true =>
-        // for table split partition
-        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
-        val location = 
theSplit.serializableHadoopSplit.value.getLocations.asScala
-        location
-      case false =>
-        // for node partition
-        val theSplit = split.asInstanceOf[CarbonNodePartition]
-        val firstOptionLocation: Seq[String] = 
List(theSplit.serializableHadoopSplit)
-        logInfo("Preferred Location for split : " + firstOptionLocation.head)
-        val blockMap = new util.LinkedHashMap[String, Integer]()
-        val tableBlocks = theSplit.blocksDetails
-        tableBlocks.foreach { tableBlock =>
-          tableBlock.getLocations.foreach { location =>
-            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-              val currentCount = blockMap.get(location)
-              if (currentCount == null) {
-                blockMap.put(location, 1)
-              } else {
-                blockMap.put(location, currentCount + 1)
-              }
+    if (isTableSplitPartition) {
+      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+      val location = 
theSplit.serializableHadoopSplit.value.getLocations.asScala
+      location
+    } else {
+      val theSplit = split.asInstanceOf[CarbonNodePartition]
+      val firstOptionLocation: Seq[String] = 
List(theSplit.serializableHadoopSplit)
+      logInfo("Preferred Location for split : " + firstOptionLocation.head)
+      val blockMap = new util.LinkedHashMap[String, Integer]()
+      val tableBlocks = theSplit.blocksDetails
+      tableBlocks.foreach { tableBlock =>
+        tableBlock.getLocations.foreach { location =>
+          if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+            val currentCount = blockMap.get(location)
+            if (currentCount == null) {
+              blockMap.put(location, 1)
+            } else {
+              blockMap.put(location, currentCount + 1)
             }
           }
         }
+      }
 
-        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith 
{(nodeCount1, nodeCount2) =>
-          nodeCount1.getValue > nodeCount2.getValue
-        }
+      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith { 
(nodeCount1, nodeCount2) =>
+        nodeCount1.getValue > nodeCount2.getValue
+      }
 
-        val sortedNodesList = sortedList.map(nodeCount => 
nodeCount.getKey).take(2)
-        firstOptionLocation ++ sortedNodesList
+      val sortedNodesList = sortedList.map(nodeCount => 
nodeCount.getKey).take(2)
+      firstOptionLocation ++ sortedNodesList
     }
   }
 }
@@ -333,10 +330,10 @@ class NewDataFrameLoaderRDD[K, V](
   override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      var partitionID = "0"
+      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
-      var model: CarbonLoadModel = carbonLoadModel
-      var uniqueLoadStatusId =
+      val model: CarbonLoadModel = carbonLoadModel
+      val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7b8b1959/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 5d6badd..84c71ed 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -328,7 +328,7 @@ object GlobalDictionaryUtil {
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
     }
-    new DictionaryLoadModel(table,
+    DictionaryLoadModel(table,
       dimensions,
       hdfsLocation,
       dictfolderPath,
@@ -345,7 +345,8 @@ object GlobalDictionaryUtil {
       hdfsTempLocation,
       lockType,
       zookeeperUrl,
-      serializationNullFormat)
+      serializationNullFormat
+    )
   }
 
   /**

Reply via email to