Hi,

The Spark CSV parser has different parsing modes:
* permissive (default) tries to read everything and missing tokens are 
interpreted as null and extra tokens are ignored
* dropmalformed drops lines which have more or less tokens
* failfast - runtimexception if there is a malformed line
Obvious this does not handle malformed gzip (you may ask the sender of the gzip 
to improve their application).

You can adapt the line you mentioned (not sure which Spark version this is), 
but you may not want to do it, because this would mean to maintain an own Spark 
version.

You can write your own datasource (i.e. different namespace than Spark CSV) 
Then, you can also think about a lot of optimisations compared to the Spark csv 
parser, which - depending on the csv and your analysis needs - can make 
processing much more faster. 

You could also add a new compressioncodec that ignores broken gzips. In this 
case you would not need an own data source.

Best regards

> On 15 Mar 2017, at 16:56, Nathan Case <nc...@gravyanalytics.com> wrote:
> 
> Accidentally sent this to the dev mailing list, meant to send it here. 
> 
> I have a spark java application that in the past has used the hadoopFile 
> interface to specify a custom TextInputFormat to be used when reading files.  
> This custom class would gracefully handle exceptions like EOF exceptions 
> caused by corrupt gzip files in the input data.  I recently switched to using 
> the csv parser built into spark but am now faced with the problem that 
> anytime a bad input file is encountered my whole job fails.
> 
> My code to load the data using csv is:
> Dataset<Row> csv = sparkSession.read()
>         .option("delimiter", parseSettings.getDelimiter().toString())
>         .option("quote", parseSettings.getQuote())
>         .option("parserLib", "UNIVOCITY")
>         .csv(paths);
> Previously I would load the data using:
> JavaRDD<String> lines = sc.newAPIHadoopFile(filePaths, NewInputFormat.class,
>         LongWritable.class, Text.class, sc.hadoopConfiguration())
>         .values()
>         .map(Text::toString);
> 
> Looking at the CSVFileFormat.scala class it looks like in the private 
> readText method if I would overwrite where it passes TextInputFormat to the 
> hadoopFile method with my customer format I would be able to achieve what I 
> want.
> 
> private def readText(
>     sparkSession: SparkSession,
>     options: CSVOptions,
>     location: String): RDD[String] = {
>   if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
>     sparkSession.sparkContext.textFile(location)
>   } else {
>     val charset = options.charset
>     sparkSession.sparkContext
>        // This is where I would want to be able to specify my
>        // input format instead of TextInputFormat
>       .hadoopFile[LongWritable, Text, TextInputFormat](location)
>       .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, 
> pair._2.getLength, charset)))
>   }
> }
> 
> Does anyone know if there is another way to prevent the corrupt files from 
> failing my job or could help to make the required changes to make the 
> TextInputFormat customizable as I have only just started looking at scala.
> 
> Thanks,
> Nathan
> 

Reply via email to