Other options are maybe : - "spark.sql.files.ignoreCorruptFiles" option
- DataFrameReader.csv(csvDataset: Dataset[String]) with custom inputformat (this is available from Spark 2.2.0). For example, val rdd = spark.sparkContext.newAPIHadoopFile("/tmp/abcd", classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength)) spark.read.csv(stringRdd.toDS) 2017-03-16 2:11 GMT+09:00 Jörn Franke <jornfra...@gmail.com>: > Hi, > > The Spark CSV parser has different parsing modes: > * permissive (default) tries to read everything and missing tokens are > interpreted as null and extra tokens are ignored > * dropmalformed drops lines which have more or less tokens > * failfast - runtimexception if there is a malformed line > Obvious this does not handle malformed gzip (you may ask the sender of the > gzip to improve their application). > > You can adapt the line you mentioned (not sure which Spark version this > is), but you may not want to do it, because this would mean to maintain an > own Spark version. > > You can write your own datasource (i.e. different namespace than Spark > CSV) Then, you can also think about a lot of optimisations compared to the > Spark csv parser, which - depending on the csv and your analysis needs - > can make processing much more faster. > > You could also add a new compressioncodec that ignores broken gzips. In > this case you would not need an own data source. > > Best regards > > On 15 Mar 2017, at 16:56, Nathan Case <nc...@gravyanalytics.com> wrote: > > Accidentally sent this to the dev mailing list, meant to send it here. > > I have a spark java application that in the past has used the hadoopFile > interface to specify a custom TextInputFormat to be used when reading > files. This custom class would gracefully handle exceptions like EOF > exceptions caused by corrupt gzip files in the input data. I recently > switched to using the csv parser built into spark but am now faced with the > problem that anytime a bad input file is encountered my whole job fails. > > My code to load the data using csv is: > > Dataset<Row> csv = sparkSession.read() > .option("delimiter", parseSettings.getDelimiter().toString()) > .option("quote", parseSettings.getQuote()) > .option("parserLib", "UNIVOCITY") > .csv(paths); > > Previously I would load the data using: > > JavaRDD<String> lines = sc.newAPIHadoopFile(filePaths, NewInputFormat.class, > LongWritable.class, Text.class, sc.hadoopConfiguration()) > .values() > .map(Text::toString); > > > Looking at the CSVFileFormat.scala class it looks like in the private > readText method if I would overwrite where it passes TextInputFormat to the > hadoopFile method with my customer format I would be able to achieve what I > want. > > private def readText( > sparkSession: SparkSession, > options: CSVOptions, > location: String): RDD[String] = { > if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { > sparkSession.sparkContext.textFile(location) > } else { > val charset = options.charset > sparkSession.sparkContext > > // This is where I would want to be able to specify my > > // input format instead of TextInputFormat > > .hadoopFile[LongWritable, Text, TextInputFormat](location) > .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, > pair._2.getLength, charset))) > } > } > > > Does anyone know if there is another way to prevent the corrupt files from > failing my job or could help to make the required changes to make the > TextInputFormat customizable as I have only just started looking at scala. > > Thanks, > Nathan > >