Ok, good luck! On Mon, 2 Mar 2020 at 10:04, Hamish Whittal <ham...@cloud-fundis.co.za> wrote:
> Enrico, Wim (and privately Neil), thanks for the replies. I will give your > suggestions a whirl. > > Basically Wim recommended a pre-processing step to weed out the > problematic files. I am going to build that into the pipeline. I am not > sure how the problems are creeping in because this is a regular lift from a > PGSQL db/table. And so some of these files are correct and some are > patently wrong. > > I'm working around the problem by trying small subsets of the 3000+ files, > but until I can weed out the problem files the processing is going to fail. > I need something more bulletproof than what I'm doing. So this is what I'm > going to try now. > > Hamish > > On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack <m...@enrico.minack.dev> > wrote: > >> Looks like the schema of some files is unexpected. >> >> You could either run parquet-tools on each of the files and extract the >> schema to find the problematic files: >> >> hdfs -stat "%n" hdfs:// >> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet >> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet> >> | while read file >> do >> echo -n "$file: " >> hadoop jar parquet-tools-1.9.0.jar schema $file >> done >> >> >> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools >> >> >> Or you can use Spark to investigate the parquet files in parallel: >> >> spark.sparkContext >> >> .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet >> >> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>") >> .map { case (path, _) => >> import collection.JavaConverters._ val file = >> HadoopInputFile.fromPath(new Path(path), new Configuration()) >> val reader = ParquetFileReader.open(file) >> try { >> val schema = reader.getFileMetaData().getSchema >> ( >> schema.getName, schema.getFields.asScala.map(f => ( >> Option(f.getId).map(_.intValue()), f.getName, >> Option(f.getOriginalType).map(_.name()), >> Option(f.getRepetition).map(_.name())) >> ).toArray >> ) >> } finally { >> reader.close() >> } >> } >> .toDF("schema name", "fields") >> .show(false) >> >> .binaryFiles provides you all filenames that match the given pattern as >> an RDD, so the following .map is executed on the Spark executors. >> The map then opens each parquet file via ParquetFileReader and provides >> access to its schema and data. >> >> I hope this points you in the right direction. >> >> Enrico >> >> >> Am 01.03.20 um 22:56 schrieb Hamish Whittal: >> >> Hi there, >> >> I have an hdfs directory with thousands of files. It seems that some of >> them - and I don't know which ones - have a problem with their schema and >> it's causing my Spark application to fail with this error: >> >> Caused by: org.apache.spark.sql.execution.QueryExecutionException: >> Parquet column cannot be converted in file hdfs:// >> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet. >> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY >> >> The problem is not only that it's causing the application to fail, but >> every time if does fail, I have to copy that file out of the directory and >> start the app again. >> >> I thought of trying to use try-except, but I can't seem to get that to >> work. >> >> Is there any advice anyone can give me because I really can't see myself >> going through thousands of files trying to figure out which ones are broken. >> >> Thanks in advance, >> >> hamish >> >> >> > > -- > Cloud-Fundis.co.za > Cape Town, South Africa > +27 79 614 4913 >