[CARBONDATA-2993] fix random NPE while concurrent loading This closes #2797
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa088256 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa088256 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa088256 Branch: refs/heads/branch-1.5 Commit: fa0882569872d3280807a5a57f36c4c43f48cc99 Parents: ca30ad9 Author: kunal642 <kunalkapoor...@gmail.com> Authored: Fri Oct 5 10:13:05 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri Oct 5 15:31:33 2018 +0530 ---------------------------------------------------------------------- .../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala | 9 +++++---- .../org/apache/carbondata/sdk/file/AvroCarbonWriter.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa088256/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index 87d8f50..3a02f85 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -47,8 +47,10 @@ abstract class CarbonRDD[T: ClassTag]( info } + @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf() + val config: Broadcast[SerializableConfiguration] = sparkContext - .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf())) + .broadcast(new SerializableConfiguration(hadoopConf)) /** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) = @@ -57,7 +59,7 @@ abstract class CarbonRDD[T: ClassTag]( protected def internalGetPartitions: Array[Partition] override def getPartitions: Array[Partition] = { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value) + ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf) internalGetPartitions } @@ -66,8 +68,7 @@ abstract class CarbonRDD[T: ClassTag]( final def compute(split: Partition, context: TaskContext): Iterator[T] = { TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) - carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config - .value.value) + carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf) ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) TaskMetricsMap.threadLocal.set(Thread.currentThread().getId) val carbonTaskInfo = new CarbonTaskInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa088256/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index d19a96d..e4a65c0 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -520,7 +520,7 @@ public class AvroCarbonWriter extends CarbonWriter { // recursively get the sub fields ArrayList<StructField> arraySubField = new ArrayList<>(); // array will have only one sub field. - StructField structField = prepareSubFields("val", childSchema.getElementType()); + StructField structField = prepareSubFields(fieldName, childSchema.getElementType()); if (structField != null) { arraySubField.add(structField); return new Field(fieldName, "array", arraySubField);