Repository: spark Updated Branches: refs/heads/branch-1.6 355bd72e0 -> 9e8a8f71f
[SPARK-12056][CORE] Create a TaskAttemptContext only after calling setConf. TaskAttemptContext's constructor will clone the configuration instead of referencing it. Calling setConf after creating TaskAttemptContext makes any changes to the configuration made inside setConf unperceived by RecordReader instances. As an example, Titan's InputFormat will change conf when calling setConf. They wrap their InputFormat around Cassandra's ColumnFamilyInputFormat, and append Cassandra's configuration. This change fixes the following error when using Titan's CassandraInputFormat with Spark: *java.lang.RuntimeException: org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_key space_args(keyspace:null)* There's a discussion of this error here: https://groups.google.com/forum/#!topic/aureliusgraphs/4zpwyrYbGAE Author: Anderson de Andrade <adeandr...@verticalscope.com> Closes #10046 from adeandrade/newhadooprdd-fix. (cherry picked from commit f434f36d508eb4dcade70871611fc022ae0feb56) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e8a8f71 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e8a8f71 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e8a8f71 Branch: refs/heads/branch-1.6 Commit: 9e8a8f71fb62f55d8fcd8f319c8c25407a8d0010 Parents: 355bd72 Author: Anderson de Andrade <adeandr...@verticalscope.com> Authored: Thu Dec 3 16:37:00 2015 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Dec 3 16:37:11 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9e8a8f71/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index d196099..86f38ae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -138,14 +138,14 @@ class NewHadoopRDD[K, V]( } inputMetrics.setBytesReadCallback(bytesReadCallback) - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance format match { case configurable: Configurable => configurable.setConf(conf) case _ => } + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) + val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) private var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org