[CARBONDATA-2168] Support global sort for standard hive partitioning

This closes #1972


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dded5d5d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dded5d5d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dded5d5d

Branch: refs/heads/master
Commit: dded5d5d54d8617796ded23dee0840997f212a0d
Parents: e51810c
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Fri Feb 9 09:37:02 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Mon Feb 26 17:40:52 2018 +0530

----------------------------------------------------------------------
 .../datastore/page/SafeDecimalColumnPage.java   |   2 +-
 .../datastore/page/SafeFixLengthColumnPage.java |   7 +
 .../datastore/page/SafeVarLengthColumnPage.java |   1 +
 .../core/metadata/PartitionMapFileStore.java    |  20 +-
 .../carbondata/core/util/DataTypeUtil.java      |  89 +++
 .../hadoop/api/CarbonOutputCommitter.java       |  73 +-
 .../hadoop/api/CarbonTableOutputFormat.java     |  12 +-
 .../hadoop/internal/ObjectArrayWritable.java    |  66 ++
 .../hadoop/ft/CarbonTableOutputFormatTest.java  |   9 +-
 .../testsuite/sortcolumns/TestSortColumns.scala |  21 +-
 .../StandardPartitionBadRecordLoggerTest.scala  |   3 +-
 .../StandardPartitionGlobalSortTestCase.scala   | 684 +++++++++++++++++++
 .../StandardPartitionTableLoadingTestCase.scala |   3 +
 ...tandardPartitionTableOverwriteTestCase.scala |  26 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   3 +-
 .../load/DataLoadProcessorStepOnSpark.scala     |  56 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   1 +
 .../spark/rdd/CarbonDropPartitionRDD.scala      |   8 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 135 ++--
 .../management/CarbonLoadDataCommand.scala      | 364 +++++-----
 .../datasources/CarbonFileFormat.scala          |  77 ++-
 .../processing/loading/DataField.java           |  10 +
 .../loading/DataLoadProcessBuilder.java         |  29 +
 .../impl/MeasureFieldConverterImpl.java         |   9 +-
 .../impl/NonDictionaryFieldConverterImpl.java   |  29 +-
 .../iterator/CarbonOutputIteratorWrapper.java   |  16 +-
 .../loading/model/CarbonLoadModel.java          |  15 +
 .../InputProcessorStepForPartitionImpl.java     | 251 +++++++
 .../store/CarbonFactDataHandlerColumnar.java    |   1 +
 .../util/CarbonDataProcessorUtil.java           |  11 +-
 30 files changed, 1730 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index cb2d3bd..0e0ba85 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -217,6 +217,6 @@ public class SafeDecimalColumnPage extends 
DecimalColumnPage {
 
   @Override
   public void freeMemory() {
-
+    byteArrayData = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 5f848c0..1e4445a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -311,6 +311,13 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void freeMemory() {
+    byteData = null;
+    shortData = null;
+    intData = null;
+    longData = null;
+    floatData = null;
+    doubleData = null;
+    shortIntData = null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index b5daddb..782b9dc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -37,6 +37,7 @@ public class SafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
 
   @Override
   public void freeMemory() {
+    byteArrayData = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 1e9cbc4..a0ce24a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -279,18 +279,20 @@ public class PartitionMapFileStore {
    *                      dropped
    * @throws IOException
    */
-  public void dropPartitions(String segmentPath, List<String> 
partitionsToDrop, String uniqueId,
-      boolean partialMatch) throws IOException {
+  public void dropPartitions(String segmentPath, List<List<String>> 
partitionsToDrop,
+      String uniqueId, boolean partialMatch) throws IOException {
     readAllPartitionsOfSegment(segmentPath);
     List<String> indexesToDrop = new ArrayList<>();
     for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) {
-      if (partialMatch) {
-        if (entry.getValue().containsAll(partitionsToDrop)) {
-          indexesToDrop.add(entry.getKey());
-        }
-      } else {
-        if (partitionsToDrop.containsAll(entry.getValue())) {
-          indexesToDrop.add(entry.getKey());
+      for (List<String> partitions: partitionsToDrop) {
+        if (partialMatch) {
+          if (entry.getValue().containsAll(partitions)) {
+            indexesToDrop.add(entry.getKey());
+          }
+        } else {
+          if (partitions.containsAll(entry.getValue())) {
+            indexesToDrop.add(entry.getKey());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 6d224cf..c370b14 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -104,6 +104,38 @@ public final class DataTypeUtil {
     }
   }
 
+  /**
+   * This method will convert a given value to its specific type
+   *
+   * @param msrValue
+   * @param dataType
+   * @param carbonMeasure
+   * @return
+   */
+  public static Object getConvertedMeasureValueBasedOnDataType(String 
msrValue, DataType dataType,
+      CarbonMeasure carbonMeasure) {
+    if (dataType == DataTypes.BOOLEAN) {
+      return BooleanConvert.parseBoolean(msrValue);
+    } else if (DataTypes.isDecimal(dataType)) {
+      BigDecimal bigDecimal =
+          new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), 
RoundingMode.HALF_UP);
+      return converter
+          .convertToDecimal(normalizeDecimalValue(bigDecimal, 
carbonMeasure.getPrecision()));
+    } else if (dataType == DataTypes.SHORT) {
+      return Short.parseShort(msrValue);
+    } else if (dataType == DataTypes.INT) {
+      return Integer.parseInt(msrValue);
+    } else if (dataType == DataTypes.LONG) {
+      return Long.valueOf(msrValue);
+    } else {
+      Double parsedValue = Double.valueOf(msrValue);
+      if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) {
+        return null;
+      }
+      return parsedValue;
+    }
+  }
+
   public static Object getMeasureObjectFromDataType(byte[] data, DataType 
dataType) {
     if (data == null || data.length == 0) {
       return null;
@@ -332,6 +364,63 @@ public final class DataTypeUtil {
     }
   }
 
+  public static Object getDataDataTypeForNoDictionaryColumn(String 
dimensionValue,
+      DataType actualDataType, String dateFormat) {
+    if (actualDataType == DataTypes.BOOLEAN) {
+      return BooleanConvert.parseBoolean(dimensionValue);
+    } else if (actualDataType == DataTypes.STRING) {
+      return converter.convertFromStringToUTF8String(dimensionValue);
+    } else if (actualDataType == DataTypes.SHORT) {
+      return Short.parseShort(dimensionValue);
+    } else if (actualDataType == DataTypes.INT) {
+      return Integer.parseInt(dimensionValue);
+    } else if (actualDataType == DataTypes.LONG) {
+      return Long.parseLong(dimensionValue);
+    } else if (actualDataType == DataTypes.TIMESTAMP) {
+      Date dateToStr = null;
+      DateFormat dateFormatter = null;
+      try {
+        if (null != dateFormat && !dateFormat.trim().isEmpty()) {
+          dateFormatter = new SimpleDateFormat(dateFormat);
+        } else {
+          dateFormatter = timeStampformatter.get();
+        }
+        dateToStr = dateFormatter.parse(dimensionValue);
+        return dateToStr.getTime();
+      } catch (ParseException e) {
+        throw new NumberFormatException(e.getMessage());
+      }
+    } else {
+      return converter.convertFromStringToUTF8String(dimensionValue);
+    }
+  }
+
+  public static byte[] getBytesDataDataTypeForNoDictionaryColumn(Object 
dimensionValue,
+      DataType actualDataType) {
+    if (dimensionValue == null) {
+      if (actualDataType == DataTypes.STRING) {
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        return new byte[0];
+      }
+    }
+    if (actualDataType == DataTypes.BOOLEAN) {
+      return ByteUtil.toBytes((Boolean) dimensionValue);
+    } else if (actualDataType == DataTypes.STRING) {
+      return ByteUtil.toBytes(dimensionValue.toString());
+    } else if (actualDataType == DataTypes.SHORT) {
+      return ByteUtil.toBytes((Short) dimensionValue);
+    } else if (actualDataType == DataTypes.INT) {
+      return ByteUtil.toBytes((Integer) dimensionValue);
+    } else if (actualDataType == DataTypes.LONG) {
+      return ByteUtil.toBytes((Long) dimensionValue);
+    } else if (actualDataType == DataTypes.TIMESTAMP) {
+      return ByteUtil.toBytes((Long)dimensionValue);
+    } else {
+      return ByteUtil.toBytes(dimensionValue.toString());
+    }
+  }
+
 
   /**
    * Below method will be used to convert the data passed to its actual data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 555ddd2..ce97169 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -18,10 +18,7 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -122,7 +119,11 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
           throw new IOException(e);
         }
       }
-      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, 
overwriteSet);
+      String uniqueId = null;
+      if (overwriteSet) {
+        uniqueId = overwritePartitions(loadModel);
+      }
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, 
false);
       if (operationContext != null) {
         LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent =
             new LoadEvents.LoadTableMergePartitionEvent(segmentPath);
@@ -138,22 +139,78 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
       String segmentsToBeDeleted =
           
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, 
"");
       List<String> segmentDeleteList = 
Arrays.asList(segmentsToBeDeleted.split(","));
+      Set<String> segmentSet = new HashSet<>(
+          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
+              .getValidAndInvalidSegments().getValidSegments());
       if (updateTime != null) {
-        Set<String> segmentSet = new HashSet<>(
-            new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
-                .getValidAndInvalidSegments().getValidSegments());
         CarbonUpdateUtil.updateTableMetadataStatus(
             segmentSet,
             carbonTable,
             updateTime,
             true,
             segmentDeleteList);
+      } else if (uniqueId != null) {
+        // Update the loadstatus with update time to clear cache from driver.
+        CarbonUpdateUtil.updateTableMetadataStatus(
+            segmentSet,
+            carbonTable,
+            uniqueId,
+            true,
+            new ArrayList<String>());
       }
     } else {
       CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
     }
   }
 
+  /**
+   * Overwrite the partitions in case of overwrite query. It just updates the 
partition map files
+   * of all segment files.
+   *
+   * @param loadModel
+   * @return
+   * @throws IOException
+   */
+  private String overwritePartitions(CarbonLoadModel loadModel) throws 
IOException {
+    CarbonTable table = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    String currentSegmentPath =
+        CarbonTablePath.getSegmentPath(loadModel.getTablePath(), 
loadModel.getSegmentId());
+    PartitionMapFileStore partitionMapFileStore = new PartitionMapFileStore();
+    partitionMapFileStore.readAllPartitionsOfSegment(currentSegmentPath);
+    List<List<String>> partitionsToDrop =
+        new 
ArrayList<List<String>>(partitionMapFileStore.getPartitionMap().values());
+    if (partitionsToDrop.size() > 0) {
+      List<String> validSegments =
+          new 
SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments()
+              .getValidSegments();
+      String uniqueId = String.valueOf(System.currentTimeMillis());
+      try {
+        // First drop the partitions from partition mapper files of each 
segment
+        for (String segment : validSegments) {
+          new PartitionMapFileStore()
+              
.dropPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment),
+                  new ArrayList<List<String>>(partitionsToDrop), uniqueId, 
false);
+
+        }
+      } catch (Exception e) {
+        // roll back the drop partitions from carbon store
+        for (String segment : validSegments) {
+          new PartitionMapFileStore()
+              
.commitPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment),
+                  uniqueId, false, table.getTablePath(), 
partitionsToDrop.get(0));
+        }
+      }
+      // Commit the removed partitions in carbon store.
+      for (String segment : validSegments) {
+        new PartitionMapFileStore()
+            
.commitPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment),
+                uniqueId, true, table.getTablePath(), partitionsToDrop.get(0));
+      }
+      return uniqueId;
+    }
+    return null;
+  }
+
   private Object getOperationContext() {
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat
     CarbonSessionInfo carbonSessionInfo = 
ThreadLocalSessionInfo.getCarbonSessionInfo();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 47c8da9..440720e 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -33,10 +33,10 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
 import org.apache.carbondata.processing.loading.TableProcessingOperations;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
 import 
org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -57,7 +57,7 @@ import 
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  * It also generate and writes dictionary data during load only if dictionary 
server is configured.
  */
 // TODO Move dictionary generater which is coded in spark to MR framework.
-public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, 
StringArrayWritable> {
+public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, 
ObjectArrayWritable> {
 
   private static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
   private static final String DATABASE_NAME = 
"mapreduce.carbontable.databaseName";
@@ -228,7 +228,7 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
   }
 
   @Override
-  public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
+  public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
       TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = 
getLoadModel(taskAttemptContext.getConfiguration());
     loadModel.setTaskNo(taskAttemptContext.getConfiguration().get(
@@ -372,7 +372,7 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
     model.setCsvHeaderColumns(columns);
   }
 
-  public static class CarbonRecordWriter extends RecordWriter<NullWritable, 
StringArrayWritable> {
+  public static class CarbonRecordWriter extends RecordWriter<NullWritable, 
ObjectArrayWritable> {
 
     private CarbonOutputIteratorWrapper iteratorWrapper;
 
@@ -394,9 +394,9 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
       this.future = future;
     }
 
-    @Override public void write(NullWritable aVoid, StringArrayWritable 
strings)
+    @Override public void write(NullWritable aVoid, ObjectArrayWritable 
objects)
         throws InterruptedException {
-      iteratorWrapper.write(strings.get());
+      iteratorWrapper.write(objects.get());
     }
 
     @Override public void close(TaskAttemptContext taskAttemptContext) throws 
InterruptedException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
new file mode 100644
index 0000000..d89e7d9
--- /dev/null
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A Object sequence that is usable as a key or value.
+ */
+public class ObjectArrayWritable implements Writable {
+  private Object[] values;
+
+  public void set(Object[] values) {
+    this.values = values;
+  }
+
+  public Object[] get() {
+    return values;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int length = in.readInt();
+    values = new Object[length];
+    for (int i = 0; i < length; i++) {
+      byte[] b = new byte[in.readInt()];
+      in.readFully(b);
+      values[i] = new String(b, Charset.defaultCharset());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(values.length);                 // write values
+    for (int i = 0; i < values.length; i++) {
+      byte[] b = values[i].toString().getBytes(Charset.defaultCharset());
+      out.writeInt(b.length);
+      out.write(b);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
index cb43b79..653a49e 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.hadoop.test.util.StoreCreator;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
@@ -86,11 +87,13 @@ public class CarbonTableOutputFormatTest {
         .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
   }
 
- public static class Map extends Mapper<NullWritable, StringArrayWritable, 
NullWritable, StringArrayWritable> {
+ public static class Map extends Mapper<NullWritable, StringArrayWritable, 
NullWritable, ObjectArrayWritable> {
 
+   private ObjectArrayWritable writable = new ObjectArrayWritable();
    @Override protected void map(NullWritable key, StringArrayWritable value, 
Context context)
        throws IOException, InterruptedException {
-     context.write(key, value);
+     writable.set(value.get());
+     context.write(key, writable);
    }
  }
 
@@ -100,7 +103,7 @@ public class CarbonTableOutputFormatTest {
     Job job = Job.getInstance(configuration);
     job.setJarByClass(CarbonTableOutputFormatTest.class);
     job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(StringArrayWritable.class);
+    job.setOutputValueClass(ObjectArrayWritable.class);
     job.setMapperClass(Map.class);
     job.setNumReduceTasks(0);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index 7c288b3..51df525 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -30,7 +30,8 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
   override def beforeAll {
     SparkUtil4Test.createTaskMockUp(sqlContext)
     dropTable
-
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
 
@@ -234,6 +235,9 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
 
   test("unsorted table creation, query data loading with heap and safe sort 
config") {
     try {
+      sql("drop table if exists origintable1")
+      sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
       setLoadingProperties("false", "false", "false")
       sql("CREATE TABLE unsortedtable_heap_safe (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) 
STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
unsortedtable_heap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
@@ -246,6 +250,9 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
 
   test("unsorted table creation, query and data loading with heap and unsafe 
sort config") {
     try {
+      sql("drop table if exists origintable1")
+      sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
       setLoadingProperties("false", "true", "false")
       sql("CREATE TABLE unsortedtable_heap_unsafe (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) 
STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
unsortedtable_heap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
@@ -258,6 +265,9 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
 
   test("unsorted table creation, query and loading with heap and inmemory sort 
config") {
     try {
+      sql("drop table if exists origintable1")
+      sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
       setLoadingProperties("false", "false", "true")
       sql("CREATE TABLE unsortedtable_heap_inmemory (empno int, empname 
String, designation String, doj Timestamp, workgroupcategory int, 
workgroupcategoryname String, deptno int, deptname String, projectcode int, 
projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization 
int,salary int) STORED BY 'org.apache.carbondata.format' 
tblproperties('sort_columns'='')")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
unsortedtable_heap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
@@ -270,6 +280,9 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
 
   test("unsorted table creation, query and data loading with offheap and safe 
sort config") {
     try {
+      sql("drop table if exists origintable1")
+      sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
       setLoadingProperties("true", "false", "false")
       sql("CREATE TABLE unsortedtable_offheap_safe (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) 
STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
unsortedtable_offheap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
@@ -282,6 +295,9 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
 
   test("unsorted table creation, query and data loading with offheap and 
unsafe sort config") {
     try {
+      sql("drop table if exists origintable1")
+      sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
       setLoadingProperties("true", "true", "false")
       sql("CREATE TABLE unsortedtable_offheap_unsafe (empno int, empname 
String, designation String, doj Timestamp, workgroupcategory int, 
workgroupcategoryname String, deptno int, deptname String, projectcode int, 
projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization 
int,salary int) STORED BY 'org.apache.carbondata.format' 
tblproperties('sort_columns'='')")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
unsortedtable_offheap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
@@ -294,6 +310,9 @@ class TestSortColumns extends QueryTest with 
BeforeAndAfterAll {
 
   test("unsorted table creation, query and data loading with offheap and 
inmemory sort config") {
     try {
+      sql("drop table if exists origintable1")
+      sql("CREATE TABLE origintable1 (empno int, empname String, designation 
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, 
deptno int, deptname String, projectcode int, projectjoindate Timestamp, 
projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 
'org.apache.carbondata.format'")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
       setLoadingProperties("true", "false", "true")
       sql("CREATE TABLE unsortedtable_offheap_inmemory (empno int, empname 
String, designation String, doj Timestamp, workgroupcategory int, 
workgroupcategoryname String, deptno int, deptname String, projectcode int, 
projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization 
int,salary int) STORED BY 'org.apache.carbondata.format' 
tblproperties('sort_columns'='')")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
unsortedtable_offheap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index e44ccd6..2e2c1f0 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -237,6 +237,7 @@ class StandardPartitionBadRecordLoggerTest extends 
QueryTest with BeforeAndAfter
   override def afterAll {
     drop()
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
new file mode 100644
index 0000000..0dbf1e4
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class StandardPartitionGlobalSortTestCase extends QueryTest with 
BeforeAndAfterAll {
+  var executorService: ExecutorService = _
+  override def beforeAll {
+    dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation 
String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String, 
partitions: Int): Unit = {
+    val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+      carbonTable.getTablePath)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".partitionmap")
+      }
+    })
+    assert(dataFiles.length == partitions)
+  }
+
+  test("data loading for global sort partition table for one partition 
column") {
+    sql(
+      """
+        | CREATE TABLE partitionone (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'GLOBAL_SORT_PARTITIONS'='1')""")
+
+    validateDataFiles("default_partitionone", "0", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
partitionone order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
+
+  }
+
+  test("data loading for global partition table for two partition column") {
+    sql(
+      """
+        | CREATE TABLE partitiontwo (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp, empname String)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitiontwo", "0", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
partitiontwo order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
+
+  }
+
+  test("data loading for global sort partition table for one static partition 
column") {
+    sql(
+      """
+        | CREATE TABLE staticpartitionone (empname String, designation String, 
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""insert into staticpartitionone PARTITION(empno='1') select 
empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary
 from originTable""")
+
+    validateDataFiles("default_staticpartitionone", "0", 1)
+  }
+
+  test("single pass loading for global sort partition table for one partition 
column") {
+    sql(
+      """
+        | CREATE TABLE singlepasspartitionone (empname String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (designation String)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'SINGLE_PASS'='true')""")
+
+    validateDataFiles("default_singlepasspartitionone", "0", 1)
+  }
+
+  test("data loading for global sort partition table for one static partition 
column with load syntax") {
+    sql(
+      """
+        | CREATE TABLE loadstaticpartitionone (empname String, designation 
String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
loadstaticpartitionone PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select distinct empno from loadstaticpartitionone"), 
Seq(Row(1)))
+  }
+
+  test("overwrite global sort partition table for one static partition column 
with load syntax") {
+    sql(
+      """
+        | CREATE TABLE loadstaticpartitiononeoverwrite (empname String, 
designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+    val rows = sql("select count(*) from 
loadstaticpartitiononeoverwrite").collect()
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select count(*) from loadstaticpartitiononeoverwrite"), 
rows)
+  }
+
+  test("test global sort partition column with special characters") {
+    sql(
+      """
+        | CREATE TABLE loadpartitionwithspecialchar (empno int, designation 
String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_with_special_char.csv' 
INTO TABLE loadpartitionwithspecialchar OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')""")
+
+    checkAnswer(sql("select count(*) from loadpartitionwithspecialchar"), 
Seq(Row(10)))
+    checkAnswer(sql("select count(*) from loadpartitionwithspecialchar where 
empname='sibi=56'"), Seq(Row(1)))
+    checkAnswer(sql("select count(*) from loadpartitionwithspecialchar where 
empname='arvind,ss'"), Seq(Row(1)))
+  }
+
+  test("concurrent global sort partition table load test") {
+    executorService = Executors.newCachedThreadPool()
+    sql(
+      """
+        | CREATE TABLE partitionmultiplethreeconcurrent (empno int, doj 
Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation 
String)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname', 
'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    val tasks = new util.ArrayList[Callable[String]]()
+    tasks.add(new QueryTask(s"""LOAD DATA local inpath 
'$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    tasks.add(new QueryTask(s"""LOAD DATA local inpath 
'$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    tasks.add(new QueryTask(s"""LOAD DATA local inpath 
'$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    val results = executorService.invokeAll(tasks)
+    for (i <- 0 until tasks.size()) {
+      val res = results.get(i).get
+      assert("PASS".equals(res))
+    }
+    executorService.shutdown()
+    checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), 
Seq(Row(30)))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        LOGGER.info("Executing :" + Thread.currentThread().getName)
+        sql(query)
+      } catch {
+        case ex: Exception =>
+          ex.printStackTrace()
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+  test("global sort bad record test with null values") {
+    sql(s"""CREATE TABLE IF NOT EXISTS emp1 (emp_no int,ename string,job 
string,mgr_id int,date_of_joining string,salary int,bonus int) partitioned by 
(dept_no int) STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')""")
+    sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE 
emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\')""")
+    val rows = sql(s"select count(*) from emp1").collect()
+    sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE 
emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\','BAD_RECORDS_ACTION'='FORCE')""")
+    checkAnswer(sql(s"select count(*) from emp1"), rows)
+  }
+
+  test("global sort badrecords on partition column") {
+    sql("create table badrecordsPartition(intField1 int, stringField1 string) 
partitioned by (intField2 int) stored by 'carbondata' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    sql(s"load data local inpath 
'$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartition 
options('bad_records_action'='force')")
+    sql("select count(*) from badrecordsPartition").show()
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where 
intfield2 is null"), Seq(Row(9)))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where 
intfield2 is not null"), Seq(Row(2)))
+  }
+
+  test("global sort badrecords fail on partition column") {
+    sql("create table badrecordsPartitionfail(intField1 int, stringField1 
string) partitioned by (intField2 int) stored by 'carbondata' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    intercept[Exception] {
+      sql(s"load data local inpath 
'$resourcesPath/data_partition_badrecords.csv' into table 
badrecordsPartitionfail options('bad_records_action'='fail')")
+
+    }
+  }
+
+  test("global sort badrecords ignore on partition column") {
+    sql("create table badrecordsPartitionignore(intField1 int, stringField1 
string) partitioned by (intField2 int) stored by 'carbondata' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    sql("create table badrecordsignore(intField1 int,intField2 int, 
stringField1 string) stored by 'carbondata' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    sql(s"load data local inpath 
'$resourcesPath/data_partition_badrecords.csv' into table 
badrecordsPartitionignore options('bad_records_action'='ignore')")
+    sql(s"load data local inpath 
'$resourcesPath/data_partition_badrecords.csv' into table badrecordsignore 
options('bad_records_action'='ignore')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where 
intfield2 is null"), sql("select count(*) cnt from badrecordsignore where 
intfield2 is null"))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where 
intfield2 is not null"), sql("select count(*) cnt from badrecordsignore where 
intfield2 is not null"))
+  }
+
+
+  test("global sort test partition fails on int null partition") {
+    sql("create table badrecordsPartitionintnull(intField1 int, stringField1 
string) partitioned by (intField2 int) stored by 'carbondata' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    sql(s"load data local inpath 
'$resourcesPath/data_partition_badrecords.csv' into table 
badrecordsPartitionintnull options('bad_records_action'='force')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnull where 
intfield2 = 13"), Seq(Row(1)))
+  }
+
+  test("global sort test partition fails on int null partition read 
alternate") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT,
 "false")
+    sql("create table badrecordsPartitionintnullalt(intField1 int, 
stringField1 string) partitioned by (intField2 int) stored by 'carbondata' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')")
+    sql(s"load data local inpath 
'$resourcesPath/data_partition_badrecords.csv' into table 
badrecordsPartitionintnullalt options('bad_records_action'='force')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnullalt 
where intfield2 = 13"), Seq(Row(1)))
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT,
 CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT)
+  }
+
+  test("global sort static column partition with load command") {
+    sql(
+      """
+        | CREATE TABLE staticpartitionload (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+    val frame = sql("select 
empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from staticpartitionload")
+    checkExistence(sql(s"""SHOW PARTITIONS staticpartitionload"""), true, 
"empname=ravi")
+  }
+
+  test("overwriting global sort static partition table for date partition 
column on insert query") {
+    sql(
+      """
+        | CREATE TABLE staticpartitiondateinsert (empno int, empname String, 
designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (projectenddate Date,doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""insert into staticpartitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert into staticpartitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert into staticpartitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert into staticpartitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert overwrite table staticpartitiondateinsert 
PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary
 from originTable where projectenddate=cast('2016-06-29' as Date)""")
+    //    sql(s"""insert overwrite table partitiondateinsert  select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    checkAnswer(sql("select * from staticpartitiondateinsert where 
projectenddate=cast('2016-06-29' as Date)"),
+      sql("select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable where projectenddate=cast('2016-06-29' as Date)"))
+  }
+
+
+  test("dynamic and static global sort partition table with load syntax") {
+    sql(
+      """
+        | CREATE TABLE loadstaticpartitiondynamic (designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int, empname String)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
loadstaticpartitiondynamic PARTITION(empno='1', empname) OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(*) from loadstaticpartitiondynamic where 
empno=1"), sql(s"select count(*) from loadstaticpartitiondynamic"))
+  }
+
+  test("dynamic and static global sort partition table with overwrite ") {
+    sql(
+      """
+        | CREATE TABLE insertstaticpartitiondynamic (designation String, doj 
Timestamp,salary int)
+        | PARTITIONED BY (empno int, empname String)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
insertstaticpartitiondynamic PARTITION(empno, empname) OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
+    val rows = sql(s"select count(*) from 
insertstaticpartitiondynamic").collect()
+    sql("""insert overwrite table insertstaticpartitiondynamic 
PARTITION(empno='1', empname) select designation, doj, salary, empname from 
insertstaticpartitiondynamic""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where 
empno=1"), rows)
+
+    intercept[Exception] {
+      sql("""insert overwrite table insertstaticpartitiondynamic 
PARTITION(empno, empname='ravi') select designation, doj, salary, empname from 
insertstaticpartitiondynamic""")
+    }
+
+  }
+
+  test("overwriting global sort all partition on table and do compaction") {
+    sql(
+      """
+        | CREATE TABLE partitionallcompaction (empno int, empname String, 
designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE partitionallcompaction PARTITION(deptname='Learning', doj, projectcode) 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"') """)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE partitionallcompaction PARTITION(deptname='configManagement', doj, 
projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE partitionallcompaction PARTITION(deptname='network', doj, projectcode) 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE partitionallcompaction PARTITION(deptname='protocol', doj, projectcode) 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO 
TABLE partitionallcompaction PARTITION(deptname='security', doj, projectcode) 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect()
+    checkExistence(sql(s"""SHOW segments for table partitionallcompaction"""), 
true, "Marked for Delete")
+  }
+
+  test("Test global sort overwrite static partition ") {
+    sql(
+      """
+        | CREATE TABLE weather6 (type String)
+        | PARTITIONED BY (year int, month int, day int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql("insert into weather6 partition(year=2014, month=5, day=25) select 
'rainy'")
+    sql("insert into weather6 partition(year=2014, month=4, day=23) select 
'cloudy'")
+    sql("insert overwrite table weather6 partition(year=2014, month=5, day=25) 
select 'sunny'")
+    checkExistence(sql("select * from weather6"), true, "sunny")
+    checkAnswer(sql("select count(*) from weather6"), Seq(Row(2)))
+  }
+
+  test("Test global sort overwrite static partition with wrong int value") {
+    sql(
+      """
+        | CREATE TABLE weather7 (type String)
+        | PARTITIONED BY (year int, month int, day int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql("insert into weather7 partition(year=2014, month=05, day=25) select 
'rainy'")
+    sql("insert into weather7 partition(year=2014, month=04, day=23) select 
'cloudy'")
+    sql("insert overwrite table weather7 partition(year=2014, month=05, 
day=25) select 'sunny'")
+    checkExistence(sql("select * from weather7"), true, "sunny")
+    checkAnswer(sql("select count(*) from weather7"), Seq(Row(2)))
+    sql("insert into weather7 partition(year=2014, month, day) select 
'rainy1',06,25")
+    sql("insert into weather7 partition(year=2014, month=01, day) select 
'rainy2',27")
+    sql("insert into weather7 partition(year=2014, month=01, day=02) select 
'rainy3'")
+    checkAnswer(sql("select count(*) from weather7 where month=1"), 
Seq(Row(2)))
+  }
+
+
+  test("test overwrite missed scenarios") {
+    sql(s"""create table carbon_test(
+      id string,
+      name string
+      )
+      PARTITIONED BY(record_date int)
+      STORED BY 'org.apache.carbondata.format'
+      TBLPROPERTIES('SORT_COLUMNS'='id')""")
+    sql(s"""create table carbon_test_hive(
+      id string,
+      name string
+      )
+      PARTITIONED BY(record_date int)""")
+    sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""")
+    sql(s"""insert overwrite table carbon_test partition(record_date) select 
'1','kim',unix_timestamp('2018-02-05','yyyy-MM-dd') as record_date""")
+    sql(s"""insert overwrite table carbon_test_hive partition(record_date) 
select '1','kim',unix_timestamp('2018-02-05','yyyy-MM-dd') as record_date""")
+
+    checkAnswer(sql(s"""select * from carbon_test where 
record_date=1517817600"""), sql(s"""select * from carbon_test_hive where 
record_date=1517817600"""))
+    sql(s"""insert overwrite table carbon_test partition(record_date) select 
'1','kim1',unix_timestamp('2018-02-06','yyyy-MM-dd') as record_date """)
+    sql(s"""insert overwrite table carbon_test_hive partition(record_date) 
select '1','kim1',unix_timestamp('2018-02-06','yyyy-MM-dd') as record_date """)
+
+    checkAnswer(sql(s"""select * from carbon_test where 
record_date=1517817600"""), sql(s"""select * from carbon_test_hive where 
record_date=1517817600"""))
+    checkAnswer(sql(s"""select * from carbon_test where 
record_date=1517904000"""), sql(s"""select * from carbon_test_hive where 
record_date=1517904000"""))
+    sql(s"""insert overwrite table carbon_test partition(record_date) select 
'1','kim2',unix_timestamp('2018-02-07','yyyy-MM-dd') as record_date""")
+    sql(s"""insert overwrite table carbon_test_hive partition(record_date) 
select '1','kim2',unix_timestamp('2018-02-07','yyyy-MM-dd') as record_date""")
+
+    checkAnswer(sql(s"""select * from carbon_test where 
record_date=1517817600"""), sql(s"""select * from carbon_test_hive where 
record_date=1517817600"""))
+    checkAnswer(sql(s"""select * from carbon_test where 
record_date=1517904000"""), sql(s"""select * from carbon_test_hive where 
record_date=1517904000"""))
+    checkAnswer(sql(s"""select * from carbon_test where 
record_date=1517990400"""), sql(s"""select * from carbon_test_hive where 
record_date=1517990400"""))
+  }
+
+  test("test overwrite with timestamp partition column") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql("DROP TABLE IF EXISTS origintable")
+    sql(
+      """
+        | CREATE TABLE origintable
+        | (id Int,
+        | vin String,
+        | logdate Timestamp,
+        | phonenumber Long,
+        | country String,
+        | area String,
+        | salary Int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      s"""
+       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table 
origintable
+       """)
+
+    sql("DROP TABLE IF EXISTS partitiontable0")
+    sql("DROP TABLE IF EXISTS partitiontable0_hive")
+    sql(
+      """
+        | CREATE TABLE partitiontable0
+        | (id Int,
+        | vin String,
+        | phonenumber Long,
+        | country String,
+        | area String,
+        | salary Int)
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='id,vin')
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE partitiontable0_hive
+        | (id Int,
+        | vin String,
+        | phonenumber Long,
+        | country String,
+        | area String,
+        | salary Int)
+        | PARTITIONED BY (logdate Timestamp)
+      """.stripMargin)
+    sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""")
+
+    sql(
+      s"""
+       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table 
partitiontable0
+       """)
+
+    sql(
+      s"""
+       insert into partitiontable0_hive select * from partitiontable0
+       """)
+
+    checkAnswer(sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0 where logdate = '2016-02-12'
+      """.stripMargin), sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0_hive where logdate = '2016-02-12'
+      """.stripMargin))
+
+    sql("insert into table partitiontable0 partition(logdate='2018-02-15 
00:00:00') " +
+              "select id,vin,phonenumber,country,area,salary from origintable")
+    sql("insert into table partitiontable0_hive partition(logdate='2018-02-15 
00:00:00') " +
+        "select id,vin,phonenumber,country,area,salary from origintable")
+    checkAnswer(sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0 where logdate = '2018-02-15'
+      """.stripMargin), sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0_hive where logdate = '2018-02-15'
+      """.stripMargin))
+
+    checkAnswer(sql(
+      s"""
+         | SELECT count(*) FROM partitiontable0""".stripMargin), sql(
+      s"""
+         | SELECT count(*) FROM partitiontable0_hive""".stripMargin))
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+  test("test overwrite with date partition column") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+    sql("DROP TABLE IF EXISTS origintable")
+    sql(
+      """
+        | CREATE TABLE origintable
+        | (id Int,
+        | vin String,
+        | logdate date,
+        | phonenumber Long,
+        | country String,
+        | area String,
+        | salary Int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      s"""
+       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table 
origintable
+       """)
+
+    sql("DROP TABLE IF EXISTS partitiontable0")
+    sql("DROP TABLE IF EXISTS partitiontable0_hive")
+    sql(
+      """
+        | CREATE TABLE partitiontable0
+        | (id Int,
+        | vin String,
+        | phonenumber Long,
+        | country String,
+        | area String,
+        | salary Int)
+        | PARTITIONED BY (logdate date)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='id,vin')
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE partitiontable0_hive
+        | (id Int,
+        | vin String,
+        | phonenumber Long,
+        | country String,
+        | area String,
+        | salary Int)
+        | PARTITIONED BY (logdate date)
+      """.stripMargin)
+    sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""")
+
+    sql(
+      s"""
+       LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table 
partitiontable0
+       """)
+
+    sql(
+      s"""
+       insert into partitiontable0_hive select * from partitiontable0
+       """)
+
+    checkAnswer(sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0 where logdate = '2016-02-12'
+      """.stripMargin), sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0_hive where logdate = '2016-02-12'
+      """.stripMargin))
+
+    sql("insert into table partitiontable0 partition(logdate='2018-02-15') " +
+        "select id,vin,phonenumber,country,area,salary from origintable")
+    sql("insert into table partitiontable0_hive 
partition(logdate='2018-02-15') " +
+        "select id,vin,phonenumber,country,area,salary from origintable")
+    checkAnswer(sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0 where logdate = '2018-02-15'
+      """.stripMargin), sql(
+      s"""
+         | SELECT logdate,id,vin,phonenumber,country,area,salary
+         | FROM partitiontable0_hive where logdate = '2018-02-15'
+      """.stripMargin))
+
+    checkAnswer(sql(
+      s"""
+         | SELECT count(*) FROM partitiontable0""".stripMargin), sql(
+      s"""
+         | SELECT count(*) FROM partitiontable0_hive""".stripMargin))
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+  }
+
+
+
+  override def afterAll = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION
 ,
+      CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+//    dropTable
+    if (executorService != null && !executorService.isShutdown) {
+      executorService.shutdownNow()
+    }
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists partitionone")
+    sql("drop table if exists partitiontwo")
+    sql("drop table if exists partitionthree")
+    sql("drop table if exists partitionmultiplethree")
+    sql("drop table if exists insertpartitionthree")
+    sql("drop table if exists staticpartitionone")
+    sql("drop table if exists singlepasspartitionone")
+    sql("drop table if exists loadstaticpartitionone")
+    sql("drop table if exists loadstaticpartitiononeoverwrite")
+    sql("drop table if exists streamingpartitionedtable")
+    sql("drop table if exists mergeindexpartitionthree")
+    sql("drop table if exists loadstaticpartitiononeissue")
+    sql("drop table if exists partitionmultiplethreeconcurrent")
+    sql("drop table if exists loadpartitionwithspecialchar")
+    sql("drop table if exists emp1")
+    sql("drop table if exists restorepartition")
+    sql("drop table if exists casesensitivepartition")
+    sql("drop table if exists badrecordsPartition")
+    sql("drop table if exists staticpartitionload")
+    sql("drop table if exists badrecordsPartitionignore")
+    sql("drop table if exists badrecordsPartitionfail")
+    sql("drop table if exists badrecordsignore")
+    sql("drop table if exists badrecordsPartitionintnull")
+    sql("drop table if exists badrecordsPartitionintnullalt")
+    sql("drop table if exists partitiondateinsert")
+    sql("drop table if exists staticpartitiondateinsert")
+    sql("drop table if exists loadstaticpartitiondynamic")
+    sql("drop table if exists insertstaticpartitiondynamic")
+    sql("drop table if exists partitionallcompaction")
+    sql("drop table if exists weather6")
+    sql("drop table if exists weather7")
+    sql("drop table if exists uniqdata_hive_static")
+    sql("drop table if exists uniqdata_hive_dynamic")
+    sql("drop table if exists uniqdata_string_static")
+    sql("drop table if exists uniqdata_string_dynamic")
+    sql("drop table if exists partitionLoadTable")
+    sql("drop table if exists noLoadTable")
+    sql("drop table if exists carbon_test")
+    sql("drop table if exists carbon_test_hive")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 669d6e7..10da906 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -506,6 +506,9 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
 
 
   override def afterAll = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION
 ,
       CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
     dropTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
index 8d31134..841185b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
@@ -77,13 +77,30 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
         | PARTITIONED BY (projectenddate Date,doj Timestamp)
         | STORED BY 'org.apache.carbondata.format'
       """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE partitiondateinserthive (empno int, empname String, 
designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (projectenddate Date,doj Timestamp)
+      """.stripMargin)
+    sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""")
     sql(s"""insert into partitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
     sql(s"""insert into partitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
     sql(s"""insert into partitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
     sql(s"""insert into partitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
     sql(s"""insert overwrite table partitiondateinsert select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable where projectenddate=cast('2016-06-29' as Date)""")
+
+    sql(s"""insert into partitiondateinserthive select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert into partitiondateinserthive select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert into partitiondateinserthive select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert into partitiondateinserthive select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable""")
+    sql(s"""insert overwrite table partitiondateinserthive select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable where projectenddate=cast('2016-06-29' as Date)""")
+
     checkAnswer(sql("select * from partitiondateinsert"),
-      sql("select empno, 
empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from originTable where projectenddate=cast('2016-06-29' as Date)"))
+      sql("select * from partitiondateinserthive"))
   }
 
   test("dynamic and static partition table with load syntax") {
@@ -182,7 +199,7 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
     sql("insert overwrite table uniqdata_string_dynamic 
partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, 
bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, double_column2,integer_column1 from 
uniqdata_hive_dynamic limit 10")
     assert(sql("select * from uniqdata_string_dynamic").collect().length == 2)
     sql("insert overwrite table uniqdata_string_dynamic select CUST_ID, 
CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, 
double_column2,integer_column1,ACTIVE_EMUI_VERSION from uniqdata_hive_dynamic 
limit 10")
-    checkAnswer(sql("select * from uniqdata_string_dynamic"), sql("select * 
from uniqdata_hive_dynamic"))
+    assert(sql("select * from uniqdata_string_dynamic").collect().length == 2)
   }
 
   test("test insert overwrite on static partition") {
@@ -193,7 +210,7 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
     sql("insert overwrite table uniqdata_string_static 
partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, 
bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, double_column2,integer_column1 from 
uniqdata_hive_static limit 10")
     assert(sql("select * from uniqdata_string_static").collect().length == 2)
     sql("insert overwrite table uniqdata_string_static select CUST_ID, 
CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, 
decimal_column2,double_column1, 
double_column2,integer_column1,active_emui_version from uniqdata_hive_static 
limit 10")
-    checkAnswer(sql("select * from uniqdata_string_static"), sql("select * 
from uniqdata_hive_static"))
+    assert(sql("select * from uniqdata_string_static").collect().length == 2)
   }
 
   test("overwrite whole partition table with empty data") {
@@ -202,7 +219,7 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
     sql("insert into partitionLoadTable select 'abd',5,'xyz'")
     sql("create table noLoadTable (name string, age int, address string) 
stored by 'carbondata'")
     sql("insert overwrite table partitionLoadTable select * from noLoadTable")
-    checkAnswer(sql("select * from partitionLoadTable"), sql("select * from 
noLoadTable"))
+    assert(sql("select * from partitionLoadTable").collect().length == 2)
   }
 
   override def afterAll = {
@@ -224,6 +241,7 @@ class StandardPartitionTableOverwriteTestCase extends 
QueryTest with BeforeAndAf
     sql("drop table if exists uniqdata_string_dynamic")
     sql("drop table if exists partitionLoadTable")
     sql("drop table if exists noLoadTable")
+    sql("drop table if exists partitiondateinserthive")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 8be70a9..30e4fc9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -95,7 +95,8 @@ object DataLoadProcessBuilderOnSpark {
       }
     }
 
-    var numPartitions = 
CarbonDataProcessorUtil.getGlobalSortPartitions(configuration)
+    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
+      
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
     if (numPartitions <= 0) {
       numPartitions = convertRDD.partitions.length
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 21de003..4b7d3f7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -95,15 +95,69 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
+  def inputAndconvertFunc(
+      rows: Iterator[Array[AnyRef]],
+      index: Int,
+      modelBroadcast: Broadcast[CarbonLoadModel],
+      partialSuccessAccum: Accumulator[Int],
+      rowCounter: Accumulator[Int],
+      keepActualData: Boolean = false): Iterator[CarbonRow] = {
+    val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+    val conf = DataLoadProcessBuilder.createConfiguration(model)
+    val rowParser = new RowParserImpl(conf.getDataFields, conf)
+    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
+    if (keepActualData) {
+      conf.getDataFields.foreach(_.setUseActualData(keepActualData))
+    }
+    val rowConverter = new RowConverterImpl(conf.getDataFields, conf, 
badRecordLogger)
+    rowConverter.initialize()
+
+    TaskContext.get().addTaskCompletionListener { context =>
+      val hasBadRecord: Boolean = CarbonBadRecordUtil.hasBadRecord(model)
+      close(conf, badRecordLogger, rowConverter)
+      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, 
hasBadRecord)
+    }
+
+    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) 
=>
+      val hasBadRecord : Boolean = CarbonBadRecordUtil.hasBadRecord(model)
+      close(conf, badRecordLogger, rowConverter)
+      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, 
hasBadRecord)
+
+      wrapException(e, model)
+    }
+
+    new Iterator[CarbonRow] {
+      override def hasNext: Boolean = rows.hasNext
+
+      override def next(): CarbonRow = {
+        var row : CarbonRow = null
+        if(isRawDataRequired) {
+          val rawRow = rows.next()
+          row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+        } else {
+          row = new CarbonRow(rowParser.parseRow(rows.next()))
+        }
+        row = rowConverter.convert(row)
+        rowCounter.add(1)
+        row
+      }
+    }
+  }
+
   def convertFunc(
       rows: Iterator[CarbonRow],
       index: Int,
       modelBroadcast: Broadcast[CarbonLoadModel],
       partialSuccessAccum: Accumulator[Int],
-      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+      rowCounter: Accumulator[Int],
+      keepActualData: Boolean = false): Iterator[CarbonRow] = {
     val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
     val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
+    if (keepActualData) {
+      conf.getDataFields.foreach(_.setUseActualData(keepActualData))
+    }
     val rowConverter = new RowConverterImpl(conf.getDataFields, conf, 
badRecordLogger)
     rowConverter.initialize()
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index dfda92c..2fdd2b1 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -48,6 +48,7 @@ object ValidateUtil {
     if (sortScope != null) {
       // Don't support use global sort on partitioned table.
       if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null &&
+          !carbonTable.isHivePartitionTable &&
           
sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
         throw new MalformedCarbonCommandException("Don't support use global 
sort on partitioned " +
           "table.")

Reply via email to