Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 95d38d651 -> ce4566038


fixDictLockIssue

fix test case


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/13cd53d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/13cd53d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/13cd53d3

Branch: refs/heads/master
Commit: 13cd53d3d300f44d40775e83364312eaf9d76847
Parents: 95d38d6
Author: QiangCai <qiang...@qq.com>
Authored: Tue Jan 10 14:34:20 2017 +0800
Committer: chenliang613 <chenliang...@apache.org>
Committed: Tue Jan 10 17:14:10 2017 +0800

----------------------------------------------------------------------
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 11 +--
 .../spark/util/GlobalDictionaryUtil.scala       |  4 +-
 .../execution/command/carbonTableSchema.scala   |  3 +
 .../spark/util/AllDictionaryTestCase.scala      |  4 ++
 .../AutoHighCardinalityIdentifyTestCase.scala   |  5 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  7 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |  3 +
 .../util/GlobalDictionaryUtilTestCase.scala     |  4 ++
 .../execution/command/carbonTableSchema.scala   | 70 ++++++++++----------
 .../processing/model/CarbonLoadModel.java       | 13 ++++
 10 files changed, 80 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 2db2ed8..e995553 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -161,7 +161,8 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
     hdfsTempLocation: String,
     lockType: String,
     zooKeeperUrl: String,
-    serializationNullFormat: String) extends Serializable
+    serializationNullFormat: String,
+    defaultTimestampFormat: String) extends Serializable
 
 case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends 
Serializable
 
@@ -276,6 +277,8 @@ class CarbonBlockDistinctValuesCombineRDD(
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
+      model.hdfsLocation)
     
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
     val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     var rowCount = 0L
@@ -285,9 +288,7 @@ class CarbonBlockDistinctValuesCombineRDD(
       val dimNum = model.dimensions.length
       var row: Row = null
       val rddIter = firstParent[Row].iterator(split, context)
-      val formatString = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-          .CARBON_TIMESTAMP_FORMAT, 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-      val format = new SimpleDateFormat(formatString)
+      val format = new SimpleDateFormat(model.defaultTimestampFormat)
       // generate block distinct value set
       while (rddIter.hasNext) {
         row = rddIter.next()
@@ -329,6 +330,8 @@ class CarbonGlobalDictionaryGenerateRDD(
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
String, Boolean)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
+      model.hdfsLocation)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     var isHighCardinalityColumn = false
     val iter = new Iterator[(Int, String, Boolean)] {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 6877354..2c8340c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -352,8 +352,8 @@ object GlobalDictionaryUtil {
       hdfsTempLocation,
       lockType,
       zookeeperUrl,
-      serializationNullFormat
-    )
+      serializationNullFormat,
+      carbonLoadModel.getDefaultTimestampFormat)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8de8236..a26e0e1 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -433,6 +433,9 @@ case class LoadTable(
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
+      
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
       carbonLoadModel
         .setSerializationNullFormat(
           TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + 
serializationNullFormat)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index ff8adca..2b1b615 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.model.CarbonLoadModel
 
@@ -60,6 +61,9 @@ class AllDictionaryTestCase extends QueryTest with 
BeforeAndAfterAll {
     carbonLoadModel.setAllDictPath(allDictFilePath)
     carbonLoadModel.setSerializationNullFormat(
           TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
+    
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
index 0738085..49585ee 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
@@ -30,7 +30,7 @@ import 
org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 
 /**
@@ -57,6 +57,9 @@ class AutoHighCardinalityIdentifyTestCase extends QueryTest 
with BeforeAndAfterA
     carbonLoadModel.setCsvDelimiter(",")
     carbonLoadModel.setComplexDelimiterLevel1("\\$")
     carbonLoadModel.setComplexDelimiterLevel2("\\:")
+    
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 625b74e..e029572 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -18,15 +18,13 @@
   */
 package org.apache.carbondata.spark.util
 
-import java.io.File
-
 import org.apache.spark.sql.common.util.QueryTest
-import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -144,6 +142,9 @@ class ExternalColumnDictionaryTestCase extends QueryTest 
with BeforeAndAfterAll
     carbonLoadModel.setQuoteChar("\"");
     carbonLoadModel.setSerializationNullFormat(
       TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
+    
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index f565eda..5a5547d 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -60,6 +60,9 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
     carbonLoadModel.setQuoteChar("\"")
     carbonLoadModel.setSerializationNullFormat(
       TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
+    
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index c2e3790..458764f 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.model.CarbonLoadModel
 
@@ -65,6 +66,9 @@ class GlobalDictionaryUtilTestCase extends QueryTest with 
BeforeAndAfterAll {
     carbonLoadModel.setQuoteChar("\"")
     carbonLoadModel.setSerializationNullFormat(
       TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
+    
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index cf6d8a6..e943d61 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -418,6 +418,9 @@ case class LoadTable(
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
+      
carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
       carbonLoadModel
         .setSerializationNullFormat(
           TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + 
serializationNullFormat)
@@ -453,40 +456,39 @@ case class LoadTable(
         carbonLoadModel.setDirectLoad(true)
         GlobalDictionaryUtil.updateTableMetadataFunc = 
LoadTable.updateTableMetadata
 
-
-val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-            val fields = dataFrame.get.schema.fields
-            import org.apache.spark.sql.functions.udf
-            // extracting only segment from tupleId
-            val getSegIdUDF = udf((tupleId: String) =>
-              CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.SEGMENT_ID))
-            // getting all fields except tupleId field as it is not required 
in the value
-            var otherFields = fields.toSeq
-              .filter(field => !field.name
-                
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-              .map(field => {
-                if 
(field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
-                  new Column(field.name
-                    .substring(0,
-                      
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
-                } else {
-
-                  new Column(field.name)
-                }
-              })
-
-            // extract tupleId field which will be used as a key
-            val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-              
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
-            // use dataFrameWithoutTupleId as dictionaryDataFrame
-            val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-            otherFields = otherFields :+ segIdColumn
-            // use dataFrameWithTupleId as loadDataFrame
-            val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
-            (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
-          } else {
-            (dataFrame, dataFrame)
-          }
+        val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+          val fields = dataFrame.get.schema.fields
+          import org.apache.spark.sql.functions.udf
+          // extracting only segment from tupleId
+          val getSegIdUDF = udf((tupleId: String) =>
+            CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.SEGMENT_ID))
+          // getting all fields except tupleId field as it is not required in 
the value
+          var otherFields = fields.toSeq
+            .filter(field => !field.name
+              
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+            .map(field => {
+              if 
(field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
+                new Column(field.name
+                  .substring(0,
+                    
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
+              } else {
+
+                new Column(field.name)
+              }
+            })
+
+          // extract tupleId field which will be used as a key
+          val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+            
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
+          // use dataFrameWithoutTupleId as dictionaryDataFrame
+          val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+          otherFields = otherFields :+ segIdColumn
+          // use dataFrameWithTupleId as loadDataFrame
+          val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+          (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+        } else {
+          (dataFrame, dataFrame)
+        }
         GlobalDictionaryUtil
           .generateGlobalDictionary(
             sparkSession.sqlContext,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13cd53d3/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index fdd314c..2d66038 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -112,6 +112,8 @@ public class CarbonLoadModel implements Serializable {
 
   private String dateFormat;
 
+  private String defaultTimestampFormat;
+
   /**
    * defines the string that should be treated as null while loadind data
    */
@@ -363,6 +365,8 @@ public class CarbonLoadModel implements Serializable {
     copy.escapeChar = escapeChar;
     copy.quoteChar = quoteChar;
     copy.commentChar = commentChar;
+    copy.dateFormat = dateFormat;
+    copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
     copy.storePath = storePath;
     copy.useOnePass = useOnePass;
@@ -410,6 +414,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.quoteChar = quoteChar;
     copyObj.commentChar = commentChar;
     copyObj.dateFormat = dateFormat;
+    copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.storePath = storePath;
     copyObj.useOnePass = useOnePass;
@@ -641,6 +646,14 @@ public class CarbonLoadModel implements Serializable {
 
   public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; 
}
 
+  public String getDefaultTimestampFormat() {
+    return defaultTimestampFormat;
+  }
+
+  public void setDefaultTimestampFormat(String defaultTimestampFormat) {
+    this.defaultTimestampFormat = defaultTimestampFormat;
+  }
+
   /**
    * @return
    */

Reply via email to