Ok, good luck!

On Mon, 2 Mar 2020 at 10:04, Hamish Whittal <ham...@cloud-fundis.co.za>
wrote:

> Enrico, Wim (and privately Neil), thanks for the replies. I will give your
> suggestions a whirl.
>
> Basically Wim recommended a pre-processing step to weed out the
> problematic files. I am going to build that into the pipeline. I am not
> sure how the problems are creeping in because this is a regular lift from a
> PGSQL db/table. And so some of these files are correct and some are
> patently wrong.
>
> I'm working around the problem by trying small subsets of the 3000+ files,
> but until I can weed out the problem files the processing is going to fail.
> I need something more bulletproof than what I'm doing. So this is what I'm
> going to try now.
>
> Hamish
>
> On Mon, Mar 2, 2020 at 10:15 AM Enrico Minack <m...@enrico.minack.dev>
> wrote:
>
>> Looks like the schema of some files is unexpected.
>>
>> You could either run parquet-tools on each of the files and extract the
>> schema to find the problematic files:
>>
>> hdfs -stat "%n" hdfs://
>> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>
>> | while read file
>> do
>>    echo -n "$file: "
>>    hadoop jar parquet-tools-1.9.0.jar schema $file
>> done
>>
>>
>> https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools
>>
>>
>> Or you can use Spark to investigate the parquet files in parallel:
>>
>> spark.sparkContext
>>   
>> .binaryFiles("hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
>>  
>> <http://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet>")
>>   .map { case (path, _) =>
>>     import collection.JavaConverters._    val file = 
>> HadoopInputFile.fromPath(new Path(path), new Configuration())
>>     val reader = ParquetFileReader.open(file)
>>     try {
>>       val schema = reader.getFileMetaData().getSchema
>>       (
>>         schema.getName,        schema.getFields.asScala.map(f => (
>>           Option(f.getId).map(_.intValue()),          f.getName,          
>> Option(f.getOriginalType).map(_.name()),          
>> Option(f.getRepetition).map(_.name()))
>>         ).toArray
>>       )
>>     } finally {
>>       reader.close()
>>     }
>>   }
>>   .toDF("schema name", "fields")
>>   .show(false)
>>
>> .binaryFiles provides you all filenames that match the given pattern as
>> an RDD, so the following .map is executed on the Spark executors.
>> The map then opens each parquet file via ParquetFileReader and provides
>> access to its schema and data.
>>
>> I hope this points you in the right direction.
>>
>> Enrico
>>
>>
>> Am 01.03.20 um 22:56 schrieb Hamish Whittal:
>>
>> Hi there,
>>
>> I have an hdfs directory with thousands of files. It seems that some of
>> them - and I don't know which ones - have a problem with their schema and
>> it's causing my Spark application to fail with this error:
>>
>> Caused by: org.apache.spark.sql.execution.QueryExecutionException:
>> Parquet column cannot be converted in file hdfs://
>> ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-00000-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet.
>> Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY
>>
>> The problem is not only that it's causing the application to fail, but
>> every time if does fail, I have to copy that file out of the directory and
>> start the app again.
>>
>> I thought of trying to use try-except, but I can't seem to get that to
>> work.
>>
>> Is there any advice anyone can give me because I really can't see myself
>> going through thousands of files trying to figure out which ones are broken.
>>
>> Thanks in advance,
>>
>> hamish
>>
>>
>>
>
> --
> Cloud-Fundis.co.za
> Cape Town, South Africa
> +27 79 614 4913
>

Reply via email to