Enrico, Wim (and privately Neil), thanks for the replies. I will give your suggestions a whirl.
Basically Wim recommended a pre-processing step to weed out the problematic files. I am going to build that into the pipeline. I am not sure how the problems are creeping in because this is a regular lift from a PGSQL db/table. And so some of these files are correct and some are patently wrong. I'm working around the problem by trying small subsets of the 3000+ files, but until I can weed out the problem files the processing is going to fail. I need something more bulletproof than what I'm doing. So this is what I'm going to try now. Hamish On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack <m...@enrico.minack.dev> wrote: > Looks like the schema of some files is unexpected. > > You could either run parquet-tools on each of the files and extract the > schema to find the problematic files: > > hdfs -stat "%n" hdfs:// > ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet > <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet> > | while read file > do > echo -n "$file: " > hadoop jar parquet-tools-1.9.0.jar schema $file > done > > > https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools > > > Or you can use Spark to investigate the parquet files in parallel: > > spark.sparkContext > > .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet > > <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>") > .map { case (path, _) => > import collection.JavaConverters._ val file = > HadoopInputFile.fromPath(new Path(path), new Configuration()) > val reader = ParquetFileReader.open(file) > try { > val schema = reader.getFileMetaData().getSchema > ( > schema.getName, schema.getFields.asScala.map(f => ( > Option(f.getId).map(_.intValue()), f.getName, > Option(f.getOriginalType).map(_.name()), > Option(f.getRepetition).map(_.name())) > ).toArray > ) > } finally { > reader.close() > } > } > .toDF("schema name", "fields") > .show(false) > > .binaryFiles provides you all filenames that match the given pattern as an > RDD, so the following .map is executed on the Spark executors. > The map then opens each parquet file via ParquetFileReader and provides > access to its schema and data. > > I hope this points you in the right direction. > > Enrico > > > Am 01.03.20 um 22:56 schrieb Hamish Whittal: > > Hi there, > > I have an hdfs directory with thousands of files. It seems that some of > them - and I don't know which ones - have a problem with their schema and > it's causing my Spark application to fail with this error: > > Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet > column cannot be converted in file hdfs:// > ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet. > Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY > > The problem is not only that it's causing the application to fail, but > every time if does fail, I have to copy that file out of the directory and > start the app again. > > I thought of trying to use try-except, but I can't seem to get that to > work. > > Is there any advice anyone can give me because I really can't see myself > going through thousands of files trying to figure out which ones are broken. > > Thanks in advance, > > hamish > > > -- Cloud-Fundis.co.za Cape Town, South Africa +27 79 614 4913