[CARBONDATA-2168] Support global sort for standard hive partitioning This closes #1972
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dded5d5d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dded5d5d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dded5d5d Branch: refs/heads/master Commit: dded5d5d54d8617796ded23dee0840997f212a0d Parents: e51810c Author: ravipesala <ravi.pes...@gmail.com> Authored: Fri Feb 9 09:37:02 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Mon Feb 26 17:40:52 2018 +0530 ---------------------------------------------------------------------- .../datastore/page/SafeDecimalColumnPage.java | 2 +- .../datastore/page/SafeFixLengthColumnPage.java | 7 + .../datastore/page/SafeVarLengthColumnPage.java | 1 + .../core/metadata/PartitionMapFileStore.java | 20 +- .../carbondata/core/util/DataTypeUtil.java | 89 +++ .../hadoop/api/CarbonOutputCommitter.java | 73 +- .../hadoop/api/CarbonTableOutputFormat.java | 12 +- .../hadoop/internal/ObjectArrayWritable.java | 66 ++ .../hadoop/ft/CarbonTableOutputFormatTest.java | 9 +- .../testsuite/sortcolumns/TestSortColumns.scala | 21 +- .../StandardPartitionBadRecordLoggerTest.scala | 3 +- .../StandardPartitionGlobalSortTestCase.scala | 684 +++++++++++++++++++ .../StandardPartitionTableLoadingTestCase.scala | 3 + ...tandardPartitionTableOverwriteTestCase.scala | 26 +- .../load/DataLoadProcessBuilderOnSpark.scala | 3 +- .../load/DataLoadProcessorStepOnSpark.scala | 56 +- .../carbondata/spark/load/ValidateUtil.scala | 1 + .../spark/rdd/CarbonDropPartitionRDD.scala | 8 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 135 ++-- .../management/CarbonLoadDataCommand.scala | 364 +++++----- .../datasources/CarbonFileFormat.scala | 77 ++- .../processing/loading/DataField.java | 10 + .../loading/DataLoadProcessBuilder.java | 29 + .../impl/MeasureFieldConverterImpl.java | 9 +- .../impl/NonDictionaryFieldConverterImpl.java | 29 +- .../iterator/CarbonOutputIteratorWrapper.java | 16 +- .../loading/model/CarbonLoadModel.java | 15 + .../InputProcessorStepForPartitionImpl.java | 251 +++++++ .../store/CarbonFactDataHandlerColumnar.java | 1 + .../util/CarbonDataProcessorUtil.java | 11 +- 30 files changed, 1730 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java index cb2d3bd..0e0ba85 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java @@ -217,6 +217,6 @@ public class SafeDecimalColumnPage extends DecimalColumnPage { @Override public void freeMemory() { - + byteArrayData = null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index 5f848c0..1e4445a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -311,6 +311,13 @@ public class SafeFixLengthColumnPage extends ColumnPage { @Override public void freeMemory() { + byteData = null; + shortData = null; + intData = null; + longData = null; + floatData = null; + doubleData = null; + shortIntData = null; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java index b5daddb..782b9dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java @@ -37,6 +37,7 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { @Override public void freeMemory() { + byteArrayData = null; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index 1e9cbc4..a0ce24a 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -279,18 +279,20 @@ public class PartitionMapFileStore { * dropped * @throws IOException */ - public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String uniqueId, - boolean partialMatch) throws IOException { + public void dropPartitions(String segmentPath, List<List<String>> partitionsToDrop, + String uniqueId, boolean partialMatch) throws IOException { readAllPartitionsOfSegment(segmentPath); List<String> indexesToDrop = new ArrayList<>(); for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) { - if (partialMatch) { - if (entry.getValue().containsAll(partitionsToDrop)) { - indexesToDrop.add(entry.getKey()); - } - } else { - if (partitionsToDrop.containsAll(entry.getValue())) { - indexesToDrop.add(entry.getKey()); + for (List<String> partitions: partitionsToDrop) { + if (partialMatch) { + if (entry.getValue().containsAll(partitions)) { + indexesToDrop.add(entry.getKey()); + } + } else { + if (partitions.containsAll(entry.getValue())) { + indexesToDrop.add(entry.getKey()); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 6d224cf..c370b14 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -104,6 +104,38 @@ public final class DataTypeUtil { } } + /** + * This method will convert a given value to its specific type + * + * @param msrValue + * @param dataType + * @param carbonMeasure + * @return + */ + public static Object getConvertedMeasureValueBasedOnDataType(String msrValue, DataType dataType, + CarbonMeasure carbonMeasure) { + if (dataType == DataTypes.BOOLEAN) { + return BooleanConvert.parseBoolean(msrValue); + } else if (DataTypes.isDecimal(dataType)) { + BigDecimal bigDecimal = + new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP); + return converter + .convertToDecimal(normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision())); + } else if (dataType == DataTypes.SHORT) { + return Short.parseShort(msrValue); + } else if (dataType == DataTypes.INT) { + return Integer.parseInt(msrValue); + } else if (dataType == DataTypes.LONG) { + return Long.valueOf(msrValue); + } else { + Double parsedValue = Double.valueOf(msrValue); + if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) { + return null; + } + return parsedValue; + } + } + public static Object getMeasureObjectFromDataType(byte[] data, DataType dataType) { if (data == null || data.length == 0) { return null; @@ -332,6 +364,63 @@ public final class DataTypeUtil { } } + public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue, + DataType actualDataType, String dateFormat) { + if (actualDataType == DataTypes.BOOLEAN) { + return BooleanConvert.parseBoolean(dimensionValue); + } else if (actualDataType == DataTypes.STRING) { + return converter.convertFromStringToUTF8String(dimensionValue); + } else if (actualDataType == DataTypes.SHORT) { + return Short.parseShort(dimensionValue); + } else if (actualDataType == DataTypes.INT) { + return Integer.parseInt(dimensionValue); + } else if (actualDataType == DataTypes.LONG) { + return Long.parseLong(dimensionValue); + } else if (actualDataType == DataTypes.TIMESTAMP) { + Date dateToStr = null; + DateFormat dateFormatter = null; + try { + if (null != dateFormat && !dateFormat.trim().isEmpty()) { + dateFormatter = new SimpleDateFormat(dateFormat); + } else { + dateFormatter = timeStampformatter.get(); + } + dateToStr = dateFormatter.parse(dimensionValue); + return dateToStr.getTime(); + } catch (ParseException e) { + throw new NumberFormatException(e.getMessage()); + } + } else { + return converter.convertFromStringToUTF8String(dimensionValue); + } + } + + public static byte[] getBytesDataDataTypeForNoDictionaryColumn(Object dimensionValue, + DataType actualDataType) { + if (dimensionValue == null) { + if (actualDataType == DataTypes.STRING) { + return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } else { + return new byte[0]; + } + } + if (actualDataType == DataTypes.BOOLEAN) { + return ByteUtil.toBytes((Boolean) dimensionValue); + } else if (actualDataType == DataTypes.STRING) { + return ByteUtil.toBytes(dimensionValue.toString()); + } else if (actualDataType == DataTypes.SHORT) { + return ByteUtil.toBytes((Short) dimensionValue); + } else if (actualDataType == DataTypes.INT) { + return ByteUtil.toBytes((Integer) dimensionValue); + } else if (actualDataType == DataTypes.LONG) { + return ByteUtil.toBytes((Long) dimensionValue); + } else if (actualDataType == DataTypes.TIMESTAMP) { + return ByteUtil.toBytes((Long)dimensionValue); + } else { + return ByteUtil.toBytes(dimensionValue.toString()); + } + } + /** * Below method will be used to convert the data passed to its actual data http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 555ddd2..ce97169 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -18,10 +18,7 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -122,7 +119,11 @@ public class CarbonOutputCommitter extends FileOutputCommitter { throw new IOException(e); } } - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + String uniqueId = null; + if (overwriteSet) { + uniqueId = overwritePartitions(loadModel); + } + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false); if (operationContext != null) { LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent = new LoadEvents.LoadTableMergePartitionEvent(segmentPath); @@ -138,22 +139,78 @@ public class CarbonOutputCommitter extends FileOutputCommitter { String segmentsToBeDeleted = context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, ""); List<String> segmentDeleteList = Arrays.asList(segmentsToBeDeleted.split(",")); + Set<String> segmentSet = new HashSet<>( + new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()) + .getValidAndInvalidSegments().getValidSegments()); if (updateTime != null) { - Set<String> segmentSet = new HashSet<>( - new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()) - .getValidAndInvalidSegments().getValidSegments()); CarbonUpdateUtil.updateTableMetadataStatus( segmentSet, carbonTable, updateTime, true, segmentDeleteList); + } else if (uniqueId != null) { + // Update the loadstatus with update time to clear cache from driver. + CarbonUpdateUtil.updateTableMetadataStatus( + segmentSet, + carbonTable, + uniqueId, + true, + new ArrayList<String>()); } } else { CarbonLoaderUtil.updateTableStatusForFailure(loadModel); } } + /** + * Overwrite the partitions in case of overwrite query. It just updates the partition map files + * of all segment files. + * + * @param loadModel + * @return + * @throws IOException + */ + private String overwritePartitions(CarbonLoadModel loadModel) throws IOException { + CarbonTable table = loadModel.getCarbonDataLoadSchema().getCarbonTable(); + String currentSegmentPath = + CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()); + PartitionMapFileStore partitionMapFileStore = new PartitionMapFileStore(); + partitionMapFileStore.readAllPartitionsOfSegment(currentSegmentPath); + List<List<String>> partitionsToDrop = + new ArrayList<List<String>>(partitionMapFileStore.getPartitionMap().values()); + if (partitionsToDrop.size() > 0) { + List<String> validSegments = + new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments() + .getValidSegments(); + String uniqueId = String.valueOf(System.currentTimeMillis()); + try { + // First drop the partitions from partition mapper files of each segment + for (String segment : validSegments) { + new PartitionMapFileStore() + .dropPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment), + new ArrayList<List<String>>(partitionsToDrop), uniqueId, false); + + } + } catch (Exception e) { + // roll back the drop partitions from carbon store + for (String segment : validSegments) { + new PartitionMapFileStore() + .commitPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment), + uniqueId, false, table.getTablePath(), partitionsToDrop.get(0)); + } + } + // Commit the removed partitions in carbon store. + for (String segment : validSegments) { + new PartitionMapFileStore() + .commitPartitions(CarbonTablePath.getSegmentPath(table.getTablePath(), segment), + uniqueId, true, table.getTablePath(), partitionsToDrop.get(0)); + } + return uniqueId; + } + return null; + } + private Object getOperationContext() { // when validate segments is disabled in thread local update it to CarbonTableInputFormat CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 47c8da9..440720e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -33,10 +33,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.processing.loading.DataLoadExecutor; import org.apache.carbondata.processing.loading.TableProcessingOperations; -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -57,7 +57,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; * It also generate and writes dictionary data during load only if dictionary server is configured. */ // TODO Move dictionary generater which is coded in spark to MR framework. -public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> { private static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; private static final String DATABASE_NAME = "mapreduce.carbontable.databaseName"; @@ -228,7 +228,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri } @Override - public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( + public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter( TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); loadModel.setTaskNo(taskAttemptContext.getConfiguration().get( @@ -372,7 +372,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri model.setCsvHeaderColumns(columns); } - public static class CarbonRecordWriter extends RecordWriter<NullWritable, StringArrayWritable> { + public static class CarbonRecordWriter extends RecordWriter<NullWritable, ObjectArrayWritable> { private CarbonOutputIteratorWrapper iteratorWrapper; @@ -394,9 +394,9 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri this.future = future; } - @Override public void write(NullWritable aVoid, StringArrayWritable strings) + @Override public void write(NullWritable aVoid, ObjectArrayWritable objects) throws InterruptedException { - iteratorWrapper.write(strings.get()); + iteratorWrapper.write(objects.get()); } @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java new file mode 100644 index 0000000..d89e7d9 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.apache.hadoop.io.Writable; + +/** + * A Object sequence that is usable as a key or value. + */ +public class ObjectArrayWritable implements Writable { + private Object[] values; + + public void set(Object[] values) { + this.values = values; + } + + public Object[] get() { + return values; + } + + @Override + public void readFields(DataInput in) throws IOException { + int length = in.readInt(); + values = new Object[length]; + for (int i = 0; i < length; i++) { + byte[] b = new byte[in.readInt()]; + in.readFully(b); + values[i] = new String(b, Charset.defaultCharset()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(values.length); // write values + for (int i = 0; i < values.length; i++) { + byte[] b = values[i].toString().getBytes(Charset.defaultCharset()); + out.writeInt(b.length); + out.write(b); + } + } + + @Override + public String toString() { + return Arrays.toString(values); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java index cb43b79..653a49e 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.hadoop.test.util.StoreCreator; import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; @@ -86,11 +87,13 @@ public class CarbonTableOutputFormatTest { .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); } - public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> { + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, ObjectArrayWritable> { + private ObjectArrayWritable writable = new ObjectArrayWritable(); @Override protected void map(NullWritable key, StringArrayWritable value, Context context) throws IOException, InterruptedException { - context.write(key, value); + writable.set(value.get()); + context.write(key, writable); } } @@ -100,7 +103,7 @@ public class CarbonTableOutputFormatTest { Job job = Job.getInstance(configuration); job.setJarByClass(CarbonTableOutputFormatTest.class); job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(StringArrayWritable.class); + job.setOutputValueClass(ObjectArrayWritable.class); job.setMapperClass(Map.class); job.setNumReduceTasks(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala index 7c288b3..51df525 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala @@ -30,7 +30,8 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { override def beforeAll { SparkUtil4Test.createTaskMockUp(sqlContext) dropTable - + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") @@ -234,6 +235,9 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { test("unsorted table creation, query data loading with heap and safe sort config") { try { + sql("drop table if exists origintable1") + sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") setLoadingProperties("false", "false", "false") sql("CREATE TABLE unsortedtable_heap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_heap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") @@ -246,6 +250,9 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { test("unsorted table creation, query and data loading with heap and unsafe sort config") { try { + sql("drop table if exists origintable1") + sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") setLoadingProperties("false", "true", "false") sql("CREATE TABLE unsortedtable_heap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_heap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") @@ -258,6 +265,9 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { test("unsorted table creation, query and loading with heap and inmemory sort config") { try { + sql("drop table if exists origintable1") + sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") setLoadingProperties("false", "false", "true") sql("CREATE TABLE unsortedtable_heap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_heap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") @@ -270,6 +280,9 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { test("unsorted table creation, query and data loading with offheap and safe sort config") { try { + sql("drop table if exists origintable1") + sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") setLoadingProperties("true", "false", "false") sql("CREATE TABLE unsortedtable_offheap_safe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_offheap_safe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") @@ -282,6 +295,9 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { test("unsorted table creation, query and data loading with offheap and unsafe sort config") { try { + sql("drop table if exists origintable1") + sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") setLoadingProperties("true", "true", "false") sql("CREATE TABLE unsortedtable_offheap_unsafe (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_offheap_unsafe OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") @@ -294,6 +310,9 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { test("unsorted table creation, query and data loading with offheap and inmemory sort config") { try { + sql("drop table if exists origintable1") + sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") setLoadingProperties("true", "false", "true") sql("CREATE TABLE unsortedtable_offheap_inmemory (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('sort_columns'='')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE unsortedtable_offheap_inmemory OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala index e44ccd6..2e2c1f0 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala @@ -237,6 +237,7 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter override def afterAll { drop() CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala new file mode 100644 index 0000000..0dbf1e4 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala @@ -0,0 +1,684 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.standardpartition + +import java.util +import java.util.concurrent.{Callable, ExecutorService, Executors} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, Ignore} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll { + var executorService: ExecutorService = _ + override def beforeAll { + dropTable + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + sql( + """ + | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + } + + def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = { + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) + val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + return file.getName.endsWith(".partitionmap") + } + }) + assert(dataFiles.length == partitions) + } + + test("data loading for global sort partition table for one partition column") { + sql( + """ + | CREATE TABLE partitionone (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='1')""") + + validateDataFiles("default_partitionone", "0", 1) + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone order by empno"), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) + + } + + test("data loading for global partition table for two partition column") { + sql( + """ + | CREATE TABLE partitiontwo (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (doj Timestamp, empname String) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + validateDataFiles("default_partitiontwo", "0", 1) + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwo order by empno"), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) + + } + + test("data loading for global sort partition table for one static partition column") { + sql( + """ + | CREATE TABLE staticpartitionone (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""insert into staticpartitionone PARTITION(empno='1') select empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary from originTable""") + + validateDataFiles("default_staticpartitionone", "0", 1) + } + + test("single pass loading for global sort partition table for one partition column") { + sql( + """ + | CREATE TABLE singlepasspartitionone (empname String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (designation String) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='true')""") + + validateDataFiles("default_singlepasspartitionone", "0", 1) + } + + test("data loading for global sort partition table for one static partition column with load syntax") { + sql( + """ + | CREATE TABLE loadstaticpartitionone (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitionone PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql("select distinct empno from loadstaticpartitionone"), Seq(Row(1))) + } + + test("overwrite global sort partition table for one static partition column with load syntax") { + sql( + """ + | CREATE TABLE loadstaticpartitiononeoverwrite (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val rows = sql("select count(*) from loadstaticpartitiononeoverwrite").collect() + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql("select count(*) from loadstaticpartitiononeoverwrite"), rows) + } + + test("test global sort partition column with special characters") { + sql( + """ + | CREATE TABLE loadpartitionwithspecialchar (empno int, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data_with_special_char.csv' INTO TABLE loadpartitionwithspecialchar OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql("select count(*) from loadpartitionwithspecialchar"), Seq(Row(10))) + checkAnswer(sql("select count(*) from loadpartitionwithspecialchar where empname='sibi=56'"), Seq(Row(1))) + checkAnswer(sql("select count(*) from loadpartitionwithspecialchar where empname='arvind,ss'"), Seq(Row(1))) + } + + test("concurrent global sort partition table load test") { + executorService = Executors.newCachedThreadPool() + sql( + """ + | CREATE TABLE partitionmultiplethreeconcurrent (empno int, doj Timestamp, + | workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (workgroupcategory int, empname String, designation String) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname', 'SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + val tasks = new util.ArrayList[Callable[String]]() + tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) + tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) + tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) + val results = executorService.invokeAll(tasks) + for (i <- 0 until tasks.size()) { + val res = results.get(i).get + assert("PASS".equals(res)) + } + executorService.shutdown() + checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), Seq(Row(30))) + } + + class QueryTask(query: String) extends Callable[String] { + override def call(): String = { + var result = "PASS" + try { + LOGGER.info("Executing :" + Thread.currentThread().getName) + sql(query) + } catch { + case ex: Exception => + ex.printStackTrace() + result = "FAIL" + } + result + } + } + + test("global sort bad record test with null values") { + sql(s"""CREATE TABLE IF NOT EXISTS emp1 (emp_no int,ename string,job string,mgr_id int,date_of_joining string,salary int,bonus int) partitioned by (dept_no int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')""") + sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\')""") + val rows = sql(s"select count(*) from emp1").collect() + sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\','BAD_RECORDS_ACTION'='FORCE')""") + checkAnswer(sql(s"select count(*) from emp1"), rows) + } + + test("global sort badrecords on partition column") { + sql("create table badrecordsPartition(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartition options('bad_records_action'='force')") + sql("select count(*) from badrecordsPartition").show() + checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is null"), Seq(Row(9))) + checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is not null"), Seq(Row(2))) + } + + test("global sort badrecords fail on partition column") { + sql("create table badrecordsPartitionfail(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") + intercept[Exception] { + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionfail options('bad_records_action'='fail')") + + } + } + + test("global sort badrecords ignore on partition column") { + sql("create table badrecordsPartitionignore(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") + sql("create table badrecordsignore(intField1 int,intField2 int, stringField1 string) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionignore options('bad_records_action'='ignore')") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsignore options('bad_records_action'='ignore')") + checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is null"), sql("select count(*) cnt from badrecordsignore where intfield2 is null")) + checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), sql("select count(*) cnt from badrecordsignore where intfield2 is not null")) + } + + + test("global sort test partition fails on int null partition") { + sql("create table badrecordsPartitionintnull(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnull options('bad_records_action'='force')") + checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnull where intfield2 = 13"), Seq(Row(1))) + } + + test("global sort test partition fails on int null partition read alternate") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, "false") + sql("create table badrecordsPartitionintnullalt(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnullalt options('bad_records_action'='force')") + checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnullalt where intfield2 = 13"), Seq(Row(1))) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT) + } + + test("global sort static column partition with load command") { + sql( + """ + | CREATE TABLE staticpartitionload (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int,projectenddate Date,doj Timestamp) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionload") + checkExistence(sql(s"""SHOW PARTITIONS staticpartitionload"""), true, "empname=ravi") + } + + test("overwriting global sort static partition table for date partition column on insert query") { + sql( + """ + | CREATE TABLE staticpartitiondateinsert (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int) + | PARTITIONED BY (projectenddate Date,doj Timestamp) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as Date)""") + // sql(s"""insert overwrite table partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29' as Date)"), + sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)")) + } + + + test("dynamic and static global sort partition table with load syntax") { + sql( + """ + | CREATE TABLE loadstaticpartitiondynamic (designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int, empname String) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiondynamic PARTITION(empno='1', empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql(s"select count(*) from loadstaticpartitiondynamic where empno=1"), sql(s"select count(*) from loadstaticpartitiondynamic")) + } + + test("dynamic and static global sort partition table with overwrite ") { + sql( + """ + | CREATE TABLE insertstaticpartitiondynamic (designation String, doj Timestamp,salary int) + | PARTITIONED BY (empno int, empname String) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect() + sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname) select designation, doj, salary, empname from insertstaticpartitiondynamic""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1"), rows) + + intercept[Exception] { + sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno, empname='ravi') select designation, doj, salary, empname from insertstaticpartitiondynamic""") + } + + } + + test("overwriting global sort all partition on table and do compaction") { + sql( + """ + | CREATE TABLE partitionallcompaction (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='Learning', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"') """) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='configManagement', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='network', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='protocol', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='security', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect() + checkExistence(sql(s"""SHOW segments for table partitionallcompaction"""), true, "Marked for Delete") + } + + test("Test global sort overwrite static partition ") { + sql( + """ + | CREATE TABLE weather6 (type String) + | PARTITIONED BY (year int, month int, day int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql("insert into weather6 partition(year=2014, month=5, day=25) select 'rainy'") + sql("insert into weather6 partition(year=2014, month=4, day=23) select 'cloudy'") + sql("insert overwrite table weather6 partition(year=2014, month=5, day=25) select 'sunny'") + checkExistence(sql("select * from weather6"), true, "sunny") + checkAnswer(sql("select count(*) from weather6"), Seq(Row(2))) + } + + test("Test global sort overwrite static partition with wrong int value") { + sql( + """ + | CREATE TABLE weather7 (type String) + | PARTITIONED BY (year int, month int, day int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql("insert into weather7 partition(year=2014, month=05, day=25) select 'rainy'") + sql("insert into weather7 partition(year=2014, month=04, day=23) select 'cloudy'") + sql("insert overwrite table weather7 partition(year=2014, month=05, day=25) select 'sunny'") + checkExistence(sql("select * from weather7"), true, "sunny") + checkAnswer(sql("select count(*) from weather7"), Seq(Row(2))) + sql("insert into weather7 partition(year=2014, month, day) select 'rainy1',06,25") + sql("insert into weather7 partition(year=2014, month=01, day) select 'rainy2',27") + sql("insert into weather7 partition(year=2014, month=01, day=02) select 'rainy3'") + checkAnswer(sql("select count(*) from weather7 where month=1"), Seq(Row(2))) + } + + + test("test overwrite missed scenarios") { + sql(s"""create table carbon_test( + id string, + name string + ) + PARTITIONED BY(record_date int) + STORED BY 'org.apache.carbondata.format' + TBLPROPERTIES('SORT_COLUMNS'='id')""") + sql(s"""create table carbon_test_hive( + id string, + name string + ) + PARTITIONED BY(record_date int)""") + sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") + sql(s"""insert overwrite table carbon_test partition(record_date) select '1','kim',unix_timestamp('2018-02-05','yyyy-MM-dd') as record_date""") + sql(s"""insert overwrite table carbon_test_hive partition(record_date) select '1','kim',unix_timestamp('2018-02-05','yyyy-MM-dd') as record_date""") + + checkAnswer(sql(s"""select * from carbon_test where record_date=1517817600"""), sql(s"""select * from carbon_test_hive where record_date=1517817600""")) + sql(s"""insert overwrite table carbon_test partition(record_date) select '1','kim1',unix_timestamp('2018-02-06','yyyy-MM-dd') as record_date """) + sql(s"""insert overwrite table carbon_test_hive partition(record_date) select '1','kim1',unix_timestamp('2018-02-06','yyyy-MM-dd') as record_date """) + + checkAnswer(sql(s"""select * from carbon_test where record_date=1517817600"""), sql(s"""select * from carbon_test_hive where record_date=1517817600""")) + checkAnswer(sql(s"""select * from carbon_test where record_date=1517904000"""), sql(s"""select * from carbon_test_hive where record_date=1517904000""")) + sql(s"""insert overwrite table carbon_test partition(record_date) select '1','kim2',unix_timestamp('2018-02-07','yyyy-MM-dd') as record_date""") + sql(s"""insert overwrite table carbon_test_hive partition(record_date) select '1','kim2',unix_timestamp('2018-02-07','yyyy-MM-dd') as record_date""") + + checkAnswer(sql(s"""select * from carbon_test where record_date=1517817600"""), sql(s"""select * from carbon_test_hive where record_date=1517817600""")) + checkAnswer(sql(s"""select * from carbon_test where record_date=1517904000"""), sql(s"""select * from carbon_test_hive where record_date=1517904000""")) + checkAnswer(sql(s"""select * from carbon_test where record_date=1517990400"""), sql(s"""select * from carbon_test_hive where record_date=1517990400""")) + } + + test("test overwrite with timestamp partition column") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + sql("DROP TABLE IF EXISTS origintable") + sql( + """ + | CREATE TABLE origintable + | (id Int, + | vin String, + | logdate Timestamp, + | phonenumber Long, + | country String, + | area String, + | salary Int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql( + s""" + LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintable + """) + + sql("DROP TABLE IF EXISTS partitiontable0") + sql("DROP TABLE IF EXISTS partitiontable0_hive") + sql( + """ + | CREATE TABLE partitiontable0 + | (id Int, + | vin String, + | phonenumber Long, + | country String, + | area String, + | salary Int) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='id,vin') + """.stripMargin) + sql( + """ + | CREATE TABLE partitiontable0_hive + | (id Int, + | vin String, + | phonenumber Long, + | country String, + | area String, + | salary Int) + | PARTITIONED BY (logdate Timestamp) + """.stripMargin) + sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") + + sql( + s""" + LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table partitiontable0 + """) + + sql( + s""" + insert into partitiontable0_hive select * from partitiontable0 + """) + + checkAnswer(sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0 where logdate = '2016-02-12' + """.stripMargin), sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0_hive where logdate = '2016-02-12' + """.stripMargin)) + + sql("insert into table partitiontable0 partition(logdate='2018-02-15 00:00:00') " + + "select id,vin,phonenumber,country,area,salary from origintable") + sql("insert into table partitiontable0_hive partition(logdate='2018-02-15 00:00:00') " + + "select id,vin,phonenumber,country,area,salary from origintable") + checkAnswer(sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0 where logdate = '2018-02-15' + """.stripMargin), sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0_hive where logdate = '2018-02-15' + """.stripMargin)) + + checkAnswer(sql( + s""" + | SELECT count(*) FROM partitiontable0""".stripMargin), sql( + s""" + | SELECT count(*) FROM partitiontable0_hive""".stripMargin)) + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + + test("test overwrite with date partition column") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + sql("DROP TABLE IF EXISTS origintable") + sql( + """ + | CREATE TABLE origintable + | (id Int, + | vin String, + | logdate date, + | phonenumber Long, + | country String, + | area String, + | salary Int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql( + s""" + LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintable + """) + + sql("DROP TABLE IF EXISTS partitiontable0") + sql("DROP TABLE IF EXISTS partitiontable0_hive") + sql( + """ + | CREATE TABLE partitiontable0 + | (id Int, + | vin String, + | phonenumber Long, + | country String, + | area String, + | salary Int) + | PARTITIONED BY (logdate date) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='id,vin') + """.stripMargin) + sql( + """ + | CREATE TABLE partitiontable0_hive + | (id Int, + | vin String, + | phonenumber Long, + | country String, + | area String, + | salary Int) + | PARTITIONED BY (logdate date) + """.stripMargin) + sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") + + sql( + s""" + LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table partitiontable0 + """) + + sql( + s""" + insert into partitiontable0_hive select * from partitiontable0 + """) + + checkAnswer(sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0 where logdate = '2016-02-12' + """.stripMargin), sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0_hive where logdate = '2016-02-12' + """.stripMargin)) + + sql("insert into table partitiontable0 partition(logdate='2018-02-15') " + + "select id,vin,phonenumber,country,area,salary from origintable") + sql("insert into table partitiontable0_hive partition(logdate='2018-02-15') " + + "select id,vin,phonenumber,country,area,salary from origintable") + checkAnswer(sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0 where logdate = '2018-02-15' + """.stripMargin), sql( + s""" + | SELECT logdate,id,vin,phonenumber,country,area,salary + | FROM partitiontable0_hive where logdate = '2018-02-15' + """.stripMargin)) + + checkAnswer(sql( + s""" + | SELECT count(*) FROM partitiontable0""".stripMargin), sql( + s""" + | SELECT count(*) FROM partitiontable0_hive""".stripMargin)) + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") + } + + + + override def afterAll = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION , + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) +// dropTable + if (executorService != null && !executorService.isShutdown) { + executorService.shutdownNow() + } + } + + def dropTable = { + sql("drop table if exists originTable") + sql("drop table if exists originMultiLoads") + sql("drop table if exists partitionone") + sql("drop table if exists partitiontwo") + sql("drop table if exists partitionthree") + sql("drop table if exists partitionmultiplethree") + sql("drop table if exists insertpartitionthree") + sql("drop table if exists staticpartitionone") + sql("drop table if exists singlepasspartitionone") + sql("drop table if exists loadstaticpartitionone") + sql("drop table if exists loadstaticpartitiononeoverwrite") + sql("drop table if exists streamingpartitionedtable") + sql("drop table if exists mergeindexpartitionthree") + sql("drop table if exists loadstaticpartitiononeissue") + sql("drop table if exists partitionmultiplethreeconcurrent") + sql("drop table if exists loadpartitionwithspecialchar") + sql("drop table if exists emp1") + sql("drop table if exists restorepartition") + sql("drop table if exists casesensitivepartition") + sql("drop table if exists badrecordsPartition") + sql("drop table if exists staticpartitionload") + sql("drop table if exists badrecordsPartitionignore") + sql("drop table if exists badrecordsPartitionfail") + sql("drop table if exists badrecordsignore") + sql("drop table if exists badrecordsPartitionintnull") + sql("drop table if exists badrecordsPartitionintnullalt") + sql("drop table if exists partitiondateinsert") + sql("drop table if exists staticpartitiondateinsert") + sql("drop table if exists loadstaticpartitiondynamic") + sql("drop table if exists insertstaticpartitiondynamic") + sql("drop table if exists partitionallcompaction") + sql("drop table if exists weather6") + sql("drop table if exists weather7") + sql("drop table if exists uniqdata_hive_static") + sql("drop table if exists uniqdata_hive_dynamic") + sql("drop table if exists uniqdata_string_static") + sql("drop table if exists uniqdata_string_dynamic") + sql("drop table if exists partitionLoadTable") + sql("drop table if exists noLoadTable") + sql("drop table if exists carbon_test") + sql("drop table if exists carbon_test_hive") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 669d6e7..10da906 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -506,6 +506,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte override def afterAll = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION , CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) dropTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala index 8d31134..841185b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala @@ -77,13 +77,30 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf | PARTITIONED BY (projectenddate Date,doj Timestamp) | STORED BY 'org.apache.carbondata.format' """.stripMargin) + sql( + """ + | CREATE TABLE partitiondateinserthive (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int) + | PARTITIONED BY (projectenddate Date,doj Timestamp) + """.stripMargin) + sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") sql(s"""insert overwrite table partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)""") + + sql(s"""insert into partitiondateinserthive select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into partitiondateinserthive select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into partitiondateinserthive select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into partitiondateinserthive select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert overwrite table partitiondateinserthive select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)""") + checkAnswer(sql("select * from partitiondateinsert"), - sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)")) + sql("select * from partitiondateinserthive")) } test("dynamic and static partition table with load syntax") { @@ -182,7 +199,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("insert overwrite table uniqdata_string_dynamic partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1 from uniqdata_hive_dynamic limit 10") assert(sql("select * from uniqdata_string_dynamic").collect().length == 2) sql("insert overwrite table uniqdata_string_dynamic select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1,ACTIVE_EMUI_VERSION from uniqdata_hive_dynamic limit 10") - checkAnswer(sql("select * from uniqdata_string_dynamic"), sql("select * from uniqdata_hive_dynamic")) + assert(sql("select * from uniqdata_string_dynamic").collect().length == 2) } test("test insert overwrite on static partition") { @@ -193,7 +210,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("insert overwrite table uniqdata_string_static partition(active_emui_version='xxx') select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1 from uniqdata_hive_static limit 10") assert(sql("select * from uniqdata_string_static").collect().length == 2) sql("insert overwrite table uniqdata_string_static select CUST_ID, CUST_NAME,DOB,doj, bigint_column1, bigint_column2, decimal_column1, decimal_column2,double_column1, double_column2,integer_column1,active_emui_version from uniqdata_hive_static limit 10") - checkAnswer(sql("select * from uniqdata_string_static"), sql("select * from uniqdata_hive_static")) + assert(sql("select * from uniqdata_string_static").collect().length == 2) } test("overwrite whole partition table with empty data") { @@ -202,7 +219,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("insert into partitionLoadTable select 'abd',5,'xyz'") sql("create table noLoadTable (name string, age int, address string) stored by 'carbondata'") sql("insert overwrite table partitionLoadTable select * from noLoadTable") - checkAnswer(sql("select * from partitionLoadTable"), sql("select * from noLoadTable")) + assert(sql("select * from partitionLoadTable").collect().length == 2) } override def afterAll = { @@ -224,6 +241,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("drop table if exists uniqdata_string_dynamic") sql("drop table if exists partitionLoadTable") sql("drop table if exists noLoadTable") + sql("drop table if exists partitiondateinserthive") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 8be70a9..30e4fc9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -95,7 +95,8 @@ object DataLoadProcessBuilderOnSpark { } } - var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(configuration) + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) if (numPartitions <= 0) { numPartitions = convertRDD.partitions.length } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 21de003..4b7d3f7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -95,15 +95,69 @@ object DataLoadProcessorStepOnSpark { } } + def inputAndconvertFunc( + rows: Iterator[Array[AnyRef]], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + partialSuccessAccum: Accumulator[Int], + rowCounter: Accumulator[Int], + keepActualData: Boolean = false): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) + val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf) + if (keepActualData) { + conf.getDataFields.foreach(_.setUseActualData(keepActualData)) + } + val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger) + rowConverter.initialize() + + TaskContext.get().addTaskCompletionListener { context => + val hasBadRecord: Boolean = CarbonBadRecordUtil.hasBadRecord(model) + close(conf, badRecordLogger, rowConverter) + GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord) + } + + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + val hasBadRecord : Boolean = CarbonBadRecordUtil.hasBadRecord(model) + close(conf, badRecordLogger, rowConverter) + GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord) + + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + var row : CarbonRow = null + if(isRawDataRequired) { + val rawRow = rows.next() + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) + } else { + row = new CarbonRow(rowParser.parseRow(rows.next())) + } + row = rowConverter.convert(row) + rowCounter.add(1) + row + } + } + } + def convertFunc( rows: Iterator[CarbonRow], index: Int, modelBroadcast: Broadcast[CarbonLoadModel], partialSuccessAccum: Accumulator[Int], - rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { + rowCounter: Accumulator[Int], + keepActualData: Boolean = false): Iterator[CarbonRow] = { val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) val conf = DataLoadProcessBuilder.createConfiguration(model) val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf) + if (keepActualData) { + conf.getDataFields.foreach(_.setUseActualData(keepActualData)) + } val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger) rowConverter.initialize() http://git-wip-us.apache.org/repos/asf/carbondata/blob/dded5d5d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala index dfda92c..2fdd2b1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala @@ -48,6 +48,7 @@ object ValidateUtil { if (sortScope != null) { // Don't support use global sort on partitioned table. if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null && + !carbonTable.isHivePartitionTable && sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) { throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " + "table.")