You can indeed override the Hadoop configuration at a per-RDD level - though it is a little more verbose, as in the below example, and you need to effectively make a copy of the hadoop Configuration:
val thisRDDConf = new Configuration(sc.hadoopConfiguration) thisRDDConf.set("mapred.min.split.size", "500000000") val rdd = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text], thisRDDConf ) println(rdd.partitions.size) val rdd2 = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text] ) println(rdd2.partitions.size) For example, if I run the above on the following directory (some files I have lying around): -rw-r--r-- 1 Nick staff 0B Jul 11 2014 _SUCCESS -rw-r--r-- 1 Nick staff 291M Sep 16 2014 part-00000 -rw-r--r-- 1 Nick staff 227M Sep 16 2014 part-00001 -rw-r--r-- 1 Nick staff 370M Sep 16 2014 part-00002 -rw-r--r-- 1 Nick staff 244M Sep 16 2014 part-00003 -rw-r--r-- 1 Nick staff 240M Sep 16 2014 part-00004 I get output: 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5 *5* ... and then for the second RDD: 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from newAPIHadoopFile at TestHash.scala:41 *45* As expected. Though a more succinct way of passing in those conf options would be nice - but this should get you what you need. On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers <ko...@tresata.com> wrote: > currently its pretty hard to control the Hadoop Input/Output formats used > in Spark. The conventions seems to be to add extra parameters to all > methods and then somewhere deep inside the code (for example in > PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into > settings on the Hadoop Configuration object. > > for example for compression i see "codec: Option[Class[_ <: > CompressionCodec]] = None" added to a bunch of methods. > > how scalable is this solution really? > > for example i need to read from a hadoop dataset and i dont want the input > (part) files to get split up. the way to do this is to set > "mapred.min.split.size". now i dont want to set this at the level of the > SparkContext (which can be done), since i dont want it to apply to input > formats in general. i want it to apply to just this one specific input > dataset i need to read. which leaves me with no options currently. i could > go add yet another input parameter to all the methods > (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile, > etc.). but that seems ineffective. > > why can we not expose a Map[String, String] or some other generic way to > manipulate settings for hadoop input/output formats? it would require > adding one more parameter to all methods to deal with hadoop input/output > formats, but after that its done. one parameter to rule them all.... > > then i could do: > val x = sc.textFile("/some/path", formatSettings = > Map("mapred.min.split.size" -> "12345")) > > or > rdd.saveAsTextFile("/some/path, formatSettings = > Map(mapred.output.compress" -> "true", "mapred.output.compression.codec" -> > "somecodec")) >