Enrico, Wim (and privately Neil), thanks for the replies. I will give your
suggestions a whirl.

Basically Wim recommended a pre-processing step to weed out the problematic
files. I am going to build that into the pipeline. I am not sure how the
problems are creeping in because this is a regular lift from a PGSQL
db/table. And so some of these files are correct and some are patently
wrong.

I'm working around the problem by trying small subsets of the 3000+ files,
but until I can weed out the problem files the processing is going to fail.
I need something more bulletproof than what I'm doing. So this is what I'm
going to try now.

Hamish

On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack <m...@enrico.minack.dev>
wrote:

> Looks like the schema of some files is unexpected.
>
> You could either run parquet-tools on each of the files and extract the
> schema to find the problematic files:
>
> hdfs -stat "%n" hdfs://
> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>
> | while read file
> do
>    echo -n "$file: "
>    hadoop jar parquet-tools-1.9.0.jar schema $file
> done
>
>
> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools
>
>
> Or you can use Spark to investigate the parquet files in parallel:
>
> spark.sparkContext
>   
> .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>  
> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>")
>   .map { case (path, _) =>
>     import collection.JavaConverters._    val file = 
> HadoopInputFile.fromPath(new Path(path), new Configuration())
>     val reader = ParquetFileReader.open(file)
>     try {
>       val schema = reader.getFileMetaData().getSchema
>       (
>         schema.getName,        schema.getFields.asScala.map(f => (
>           Option(f.getId).map(_.intValue()),          f.getName,          
> Option(f.getOriginalType).map(_.name()),          
> Option(f.getRepetition).map(_.name()))
>         ).toArray
>       )
>     } finally {
>       reader.close()
>     }
>   }
>   .toDF("schema name", "fields")
>   .show(false)
>
> .binaryFiles provides you all filenames that match the given pattern as an
> RDD, so the following .map is executed on the Spark executors.
> The map then opens each parquet file via ParquetFileReader and provides
> access to its schema and data.
>
> I hope this points you in the right direction.
>
> Enrico
>
>
> Am 01.03.20 um 22:56 schrieb Hamish Whittal:
>
> Hi there,
>
> I have an hdfs directory with thousands of files. It seems that some of
> them - and I don't know which ones - have a problem with their schema and
> it's causing my Spark application to fail with this error:
>
> Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet
> column cannot be converted in file hdfs://
> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>
> The problem is not only that it's causing the application to fail, but
> every time if does fail, I have to copy that file out of the directory and
> start the app again.
>
> I thought of trying to use try-except, but I can't seem to get that to
> work.
>
> Is there any advice anyone can give me because I really can't see myself
> going through thousands of files trying to figure out which ones are broken.
>
> Thanks in advance,
>
> hamish
>
>
>

-- 
Cloud-Fundis.co.za
Cape Town, South Africa
+27 79 614 4913

Reply via email to