Other options are maybe :

- "spark.sql.files.ignoreCorruptFiles" option

- DataFrameReader.csv(csvDataset: Dataset[String]) with custom inputformat
(this is available from Spark 2.2.0).

For example,

val rdd = spark.sparkContext.newAPIHadoopFile("/tmp/abcd",
  classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
  classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text])
val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0,
pair._2.getLength))

spark.read.csv(stringRdd.toDS)

​


2017-03-16 2:11 GMT+09:00 Jörn Franke <jornfra...@gmail.com>:

> Hi,
>
> The Spark CSV parser has different parsing modes:
> * permissive (default) tries to read everything and missing tokens are
> interpreted as null and extra tokens are ignored
> * dropmalformed drops lines which have more or less tokens
> * failfast - runtimexception if there is a malformed line
> Obvious this does not handle malformed gzip (you may ask the sender of the
> gzip to improve their application).
>
> You can adapt the line you mentioned (not sure which Spark version this
> is), but you may not want to do it, because this would mean to maintain an
> own Spark version.
>
> You can write your own datasource (i.e. different namespace than Spark
> CSV) Then, you can also think about a lot of optimisations compared to the
> Spark csv parser, which - depending on the csv and your analysis needs -
> can make processing much more faster.
>
> You could also add a new compressioncodec that ignores broken gzips. In
> this case you would not need an own data source.
>
> Best regards
>
> On 15 Mar 2017, at 16:56, Nathan Case <nc...@gravyanalytics.com> wrote:
>
> Accidentally sent this to the dev mailing list, meant to send it here.
>
> I have a spark java application that in the past has used the hadoopFile
> interface to specify a custom TextInputFormat to be used when reading
> files.  This custom class would gracefully handle exceptions like EOF
> exceptions caused by corrupt gzip files in the input data.  I recently
> switched to using the csv parser built into spark but am now faced with the
> problem that anytime a bad input file is encountered my whole job fails.
>
> My code to load the data using csv is:
>
> Dataset<Row> csv = sparkSession.read()
>         .option("delimiter", parseSettings.getDelimiter().toString())
>         .option("quote", parseSettings.getQuote())
>         .option("parserLib", "UNIVOCITY")
>         .csv(paths);
>
> Previously I would load the data using:
>
> JavaRDD<String> lines = sc.newAPIHadoopFile(filePaths, NewInputFormat.class,
>         LongWritable.class, Text.class, sc.hadoopConfiguration())
>         .values()
>         .map(Text::toString);
>
>
> Looking at the CSVFileFormat.scala class it looks like in the private
> readText method if I would overwrite where it passes TextInputFormat to the
> hadoopFile method with my customer format I would be able to achieve what I
> want.
>
> private def readText(
>     sparkSession: SparkSession,
>     options: CSVOptions,
>     location: String): RDD[String] = {
>   if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
>     sparkSession.sparkContext.textFile(location)
>   } else {
>     val charset = options.charset
>     sparkSession.sparkContext
>
>        // This is where I would want to be able to specify my
>
>        // input format instead of TextInputFormat
>
>       .hadoopFile[LongWritable, Text, TextInputFormat](location)
>       .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, 
> pair._2.getLength, charset)))
>   }
> }
>
>
> Does anyone know if there is another way to prevent the corrupt files from
> failing my job or could help to make the required changes to make the
> TextInputFormat customizable as I have only just started looking at scala.
>
> Thanks,
> Nathan
>
>

Reply via email to