[Spark CSV]: Use Custom TextInputFormat to Prevent Exceptions

2017-03-15 Thread Nathan Case
Accidentally sent this to the dev mailing list, meant to send it here.

I have a spark java application that in the past has used the hadoopFile
interface to specify a custom TextInputFormat to be used when reading
files.  This custom class would gracefully handle exceptions like EOF
exceptions caused by corrupt gzip files in the input data.  I recently
switched to using the csv parser built into spark but am now faced with the
problem that anytime a bad input file is encountered my whole job fails.

My code to load the data using csv is:

Dataset csv = sparkSession.read()
.option("delimiter", parseSettings.getDelimiter().toString())
.option("quote", parseSettings.getQuote())
.option("parserLib", "UNIVOCITY")
.csv(paths);

Previously I would load the data using:

JavaRDD lines = sc.newAPIHadoopFile(filePaths, NewInputFormat.class,
LongWritable.class, Text.class, sc.hadoopConfiguration())
.values()
.map(Text::toString);


Looking at the CSVFileFormat.scala class it looks like in the private
readText method if I would overwrite where it passes TextInputFormat to the
hadoopFile method with my customer format I would be able to achieve what I
want.

private def readText(
sparkSession: SparkSession,
options: CSVOptions,
location: String): RDD[String] = {
  if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
sparkSession.sparkContext.textFile(location)
  } else {
val charset = options.charset
sparkSession.sparkContext

   // This is where I would want to be able to specify my

   // input format instead of TextInputFormat

  .hadoopFile[LongWritable, Text, TextInputFormat](location)
  .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0,
pair._2.getLength, charset)))
  }
}


Does anyone know if there is another way to prevent the corrupt files from
failing my job or could help to make the required changes to make the
TextInputFormat customizable as I have only just started looking at scala.

Thanks,
Nathan


Partitioning Data to optimize combineByKey

2016-06-02 Thread Nathan Case
Hello,

I am trying to process a dataset that is approximately 2 tb using a cluster
with 4.5 tb of ram.  The data is in parquet format and is initially loaded
into a dataframe.  A subset of the data is then queried for and converted
to RDD for more complicated processing.  The first stage of that processing
is to mapToPair to use each rows id as the key in a tuple.  Then the data
goes through a combineByKey operation to group all values with the same
key.  This operation always exceeds the maximum cluster memory and the job
eventually fails.  While it is shuffling there is a lot of "spilling
in-memory map to disk" messages.  I am wondering if I were to have the data
initially partitioned such that all the rows with the same id resided
within the same partition if it would need to do left shuffling and perform
correctly.

To do the initial load I am using:

sqlContext.read().parquet(inputPathArray).repartition(1, new
Column("id"));

I am not sure if this is the correct way to partition a dataframe so that
is my first question is the above correct.

My next question is that when I go from the dataframe to rdd using:

JavaRDD locationsForSpecificKey = sqlc.sql("SELECT * FROM
standardlocationrecords WHERE customerID = " + customerID + " AND
partnerAppID = " + partnerAppID)
.toJavaRDD().map(new LocationRecordFromRow()::apply);

is the partition scheme from the dataframe preserved or do I need to
repartition after doing a mapToPair using:

rdd.partitionBy and passing in a custom HashPartitioner that uses the hash
of the ID field.

My goal is to reduce the shuffling when doing the final combineByKey to
prevent the job from running out of memory and failing.  Any help would be
greatly appreciated.

Thanks,
Nathan