You can indeed override the Hadoop configuration at a per-RDD level -
though it is a little more verbose, as in the below example, and you need
to effectively make a copy of the hadoop Configuration:

val thisRDDConf = new Configuration(sc.hadoopConfiguration)
thisRDDConf.set("mapred.min.split.size", "500000000")
val rdd = sc.newAPIHadoopFile(path,
  classOf[SequenceFileInputFormat[IntWritable, Text]],
  classOf[IntWritable],
  classOf[Text],
  thisRDDConf
)
println(rdd.partitions.size)

val rdd2 = sc.newAPIHadoopFile(path,
  classOf[SequenceFileInputFormat[IntWritable, Text]],
  classOf[IntWritable],
  classOf[Text]
)
println(rdd2.partitions.size)


For example, if I run the above on the following directory (some files I
have lying around):

-rw-r--r--  1 Nick  staff     0B Jul 11  2014 _SUCCESS
-rw-r--r--  1 Nick  staff   291M Sep 16  2014 part-00000
-rw-r--r--  1 Nick  staff   227M Sep 16  2014 part-00001
-rw-r--r--  1 Nick  staff   370M Sep 16  2014 part-00002
-rw-r--r--  1 Nick  staff   244M Sep 16  2014 part-00003
-rw-r--r--  1 Nick  staff   240M Sep 16  2014 part-00004

I get output:

15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5
*5*

... and then for the second RDD:

15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from
newAPIHadoopFile at TestHash.scala:41
*45*

As expected.

Though a more succinct way of passing in those conf options would be nice -
but this should get you what you need.



On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers <ko...@tresata.com> wrote:

> currently its pretty hard to control the Hadoop Input/Output formats used
> in Spark. The conventions seems to be to add extra parameters to all
> methods and then somewhere deep inside the code (for example in
> PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
> settings on the Hadoop Configuration object.
>
> for example for compression i see "codec: Option[Class[_ <:
> CompressionCodec]] = None" added to a bunch of methods.
>
> how scalable is this solution really?
>
> for example i need to read from a hadoop dataset and i dont want the input
> (part) files to get split up. the way to do this is to set
> "mapred.min.split.size". now i dont want to set this at the level of the
> SparkContext (which can be done), since i dont want it to apply to input
> formats in general. i want it to apply to just this one specific input
> dataset i need to read. which leaves me with no options currently. i could
> go add yet another input parameter to all the methods
> (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
> etc.). but that seems ineffective.
>
> why can we not expose a Map[String, String] or some other generic way to
> manipulate settings for hadoop input/output formats? it would require
> adding one more parameter to all methods to deal with hadoop input/output
> formats, but after that its done. one parameter to rule them all....
>
> then i could do:
> val x = sc.textFile("/some/path", formatSettings =
> Map("mapred.min.split.size" -> "12345"))
>
> or
> rdd.saveAsTextFile("/some/path, formatSettings =
> Map(mapred.output.compress" -> "true", "mapred.output.compression.codec" ->
> "somecodec"))
>

Reply via email to