Repository: incubator-carbondata Updated Branches: refs/heads/master 95d38d651 -> ce4566038
fixDictLockIssue fix test case Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/13cd53d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/13cd53d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/13cd53d3 Branch: refs/heads/master Commit: 13cd53d3d300f44d40775e83364312eaf9d76847 Parents: 95d38d6 Author: QiangCai <qiang...@qq.com> Authored: Tue Jan 10 14:34:20 2017 +0800 Committer: chenliang613 <chenliang...@apache.org> Committed: Tue Jan 10 17:14:10 2017 +0800 ---------------------------------------------------------------------- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 11 +-- .../spark/util/GlobalDictionaryUtil.scala | 4 +- .../execution/command/carbonTableSchema.scala | 3 + .../spark/util/AllDictionaryTestCase.scala | 4 ++ .../AutoHighCardinalityIdentifyTestCase.scala | 5 +- .../util/ExternalColumnDictionaryTestCase.scala | 7 +- ...GlobalDictionaryUtilConcurrentTestCase.scala | 3 + .../util/GlobalDictionaryUtilTestCase.scala | 4 ++ .../execution/command/carbonTableSchema.scala | 70 ++++++++++---------- .../processing/model/CarbonLoadModel.java | 13 ++++ 10 files changed, 80 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 2db2ed8..e995553 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -161,7 +161,8 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier, hdfsTempLocation: String, lockType: String, zooKeeperUrl: String, - serializationNullFormat: String) extends Serializable + serializationNullFormat: String, + defaultTimestampFormat: String) extends Serializable case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable @@ -276,6 +277,8 @@ class CarbonBlockDistinctValuesCombineRDD( override def compute(split: Partition, context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, + model.hdfsLocation) CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime() val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])] var rowCount = 0L @@ -285,9 +288,7 @@ class CarbonBlockDistinctValuesCombineRDD( val dimNum = model.dimensions.length var row: Row = null val rddIter = firstParent[Row].iterator(split, context) - val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants - .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - val format = new SimpleDateFormat(formatString) + val format = new SimpleDateFormat(model.defaultTimestampFormat) // generate block distinct value set while (rddIter.hasNext) { row = rddIter.next() @@ -329,6 +330,8 @@ class CarbonGlobalDictionaryGenerateRDD( override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, + model.hdfsLocation) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS var isHighCardinalityColumn = false val iter = new Iterator[(Int, String, Boolean)] { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 6877354..2c8340c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -352,8 +352,8 @@ object GlobalDictionaryUtil { hdfsTempLocation, lockType, zookeeperUrl, - serializationNullFormat - ) + serializationNullFormat, + carbonLoadModel.getDefaultTimestampFormat) } /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 8de8236..a26e0e1 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -433,6 +433,9 @@ case class LoadTable( carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) carbonLoadModel.setDateFormat(dateFormat) + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel .setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index ff8adca..2b1b615 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.carbon.CarbonDataLoadSchema import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.model.CarbonLoadModel @@ -60,6 +61,9 @@ class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll { carbonLoadModel.setAllDictPath(allDictFilePath) carbonLoadModel.setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N") + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala index 0738085..49585ee 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala @@ -30,7 +30,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.processing.model.CarbonLoadModel /** @@ -57,6 +57,9 @@ class AutoHighCardinalityIdentifyTestCase extends QueryTest with BeforeAndAfterA carbonLoadModel.setCsvDelimiter(",") carbonLoadModel.setComplexDelimiterLevel1("\\$") carbonLoadModel.setComplexDelimiterLevel2("\\:") + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 625b74e..e029572 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -18,15 +18,13 @@ */ package org.apache.carbondata.spark.util -import java.io.File - import org.apache.spark.sql.common.util.QueryTest -import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.sql.{CarbonEnv, CarbonRelation} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.carbon.CarbonDataLoadSchema import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.CarbonLoadModel @@ -144,6 +142,9 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll carbonLoadModel.setQuoteChar("\""); carbonLoadModel.setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N") + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala index f565eda..5a5547d 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala @@ -60,6 +60,9 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft carbonLoadModel.setQuoteChar("\"") carbonLoadModel.setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N") + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala index c2e3790..458764f 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala @@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.carbon.CarbonDataLoadSchema import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.model.CarbonLoadModel @@ -65,6 +66,9 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll { carbonLoadModel.setQuoteChar("\"") carbonLoadModel.setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N") + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index cf6d8a6..e943d61 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -418,6 +418,9 @@ case class LoadTable( carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) carbonLoadModel.setDateFormat(dateFormat) + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel .setSerializationNullFormat( TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat) @@ -453,40 +456,39 @@ case class LoadTable( carbonLoadModel.setDirectLoad(true) GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata - -val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { - val fields = dataFrame.get.schema.fields - import org.apache.spark.sql.functions.udf - // extracting only segment from tupleId - val getSegIdUDF = udf((tupleId: String) => - CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID)) - // getting all fields except tupleId field as it is not required in the value - var otherFields = fields.toSeq - .filter(field => !field.name - .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - .map(field => { - if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) { - new Column(field.name - .substring(0, - field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))) - } else { - - new Column(field.name) - } - }) - - // extract tupleId field which will be used as a key - val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute - .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId") - // use dataFrameWithoutTupleId as dictionaryDataFrame - val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) - otherFields = otherFields :+ segIdColumn - // use dataFrameWithTupleId as loadDataFrame - val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*) - (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId)) - } else { - (dataFrame, dataFrame) - } + val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { + val fields = dataFrame.get.schema.fields + import org.apache.spark.sql.functions.udf + // extracting only segment from tupleId + val getSegIdUDF = udf((tupleId: String) => + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID)) + // getting all fields except tupleId field as it is not required in the value + var otherFields = fields.toSeq + .filter(field => !field.name + .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) + .map(field => { + if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) { + new Column(field.name + .substring(0, + field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))) + } else { + + new Column(field.name) + } + }) + + // extract tupleId field which will be used as a key + val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute + .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId") + // use dataFrameWithoutTupleId as dictionaryDataFrame + val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) + otherFields = otherFields :+ segIdColumn + // use dataFrameWithTupleId as loadDataFrame + val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*) + (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId)) + } else { + (dataFrame, dataFrame) + } GlobalDictionaryUtil .generateGlobalDictionary( sparkSession.sqlContext, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index fdd314c..2d66038 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -112,6 +112,8 @@ public class CarbonLoadModel implements Serializable { private String dateFormat; + private String defaultTimestampFormat; + /** * defines the string that should be treated as null while loadind data */ @@ -363,6 +365,8 @@ public class CarbonLoadModel implements Serializable { copy.escapeChar = escapeChar; copy.quoteChar = quoteChar; copy.commentChar = commentChar; + copy.dateFormat = dateFormat; + copy.defaultTimestampFormat = defaultTimestampFormat; copy.maxColumns = maxColumns; copy.storePath = storePath; copy.useOnePass = useOnePass; @@ -410,6 +414,7 @@ public class CarbonLoadModel implements Serializable { copyObj.quoteChar = quoteChar; copyObj.commentChar = commentChar; copyObj.dateFormat = dateFormat; + copyObj.defaultTimestampFormat = defaultTimestampFormat; copyObj.maxColumns = maxColumns; copyObj.storePath = storePath; copyObj.useOnePass = useOnePass; @@ -641,6 +646,14 @@ public class CarbonLoadModel implements Serializable { public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; } + public String getDefaultTimestampFormat() { + return defaultTimestampFormat; + } + + public void setDefaultTimestampFormat(String defaultTimestampFormat) { + this.defaultTimestampFormat = defaultTimestampFormat; + } + /** * @return */