Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e7e370cac -> 9dd09659a


fix bug for reading dataframe concurrently


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9dcdf7de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9dcdf7de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9dcdf7de

Branch: refs/heads/master
Commit: 9dcdf7de6bde64d1c800fd268f2099d2278e8f33
Parents: e7e370c
Author: QiangCai <qiang...@qq.com>
Authored: Fri Dec 2 17:41:23 2016 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Fri Dec 2 17:46:56 2016 +0800

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonDataLoadRDD.scala     | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9dcdf7de/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 319d85c..5d6a663 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.rdd
 
 import java.lang.Long
+import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util
 import java.util.UUID
@@ -548,10 +549,22 @@ class DataFrameLoaderRDD[K, V](
 class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
     carbonLoadModel: CarbonLoadModel,
     context: TaskContext) extends 
JavaRddIterator[JavaRddIterator[Array[String]]] {
+  val serializer = SparkEnv.get.closureSerializer.newInstance()
+  var serializeBuffer: ByteBuffer = null
   def hasNext: Boolean = partitionIter.hasNext
+
   def next: JavaRddIterator[Array[String]] = {
     val value = partitionIter.next
-    new RddIterator(value.rdd.iterator(value.partition, context),
+    // The rdd (which come from Hive Table) don't support to read dataframe 
concurrently.
+    // So here will create different rdd instance for each thread.
+    val newInstance = {
+      if (serializeBuffer == null) {
+        serializeBuffer = serializer.serialize[RDD[Row]](value.rdd)
+      }
+      serializeBuffer.rewind()
+      serializer.deserialize[RDD[Row]](serializeBuffer)
+    }
+    new RddIterator(newInstance.iterator(value.partition, context),
         carbonLoadModel,
         context)
   }

Reply via email to