Repository: incubator-carbondata Updated Branches: refs/heads/master e7e370cac -> 9dd09659a
fix bug for reading dataframe concurrently Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9dcdf7de Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9dcdf7de Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9dcdf7de Branch: refs/heads/master Commit: 9dcdf7de6bde64d1c800fd268f2099d2278e8f33 Parents: e7e370c Author: QiangCai <qiang...@qq.com> Authored: Fri Dec 2 17:41:23 2016 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Fri Dec 2 17:46:56 2016 +0800 ---------------------------------------------------------------------- .../carbondata/spark/rdd/CarbonDataLoadRDD.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9dcdf7de/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala index 319d85c..5d6a663 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.spark.rdd import java.lang.Long +import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util import java.util.UUID @@ -548,10 +549,22 @@ class DataFrameLoaderRDD[K, V]( class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]], carbonLoadModel: CarbonLoadModel, context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] { + val serializer = SparkEnv.get.closureSerializer.newInstance() + var serializeBuffer: ByteBuffer = null def hasNext: Boolean = partitionIter.hasNext + def next: JavaRddIterator[Array[String]] = { val value = partitionIter.next - new RddIterator(value.rdd.iterator(value.partition, context), + // The rdd (which come from Hive Table) don't support to read dataframe concurrently. + // So here will create different rdd instance for each thread. + val newInstance = { + if (serializeBuffer == null) { + serializeBuffer = serializer.serialize[RDD[Row]](value.rdd) + } + serializeBuffer.rewind() + serializer.deserialize[RDD[Row]](serializeBuffer) + } + new RddIterator(newInstance.iterator(value.partition, context), carbonLoadModel, context) }