SparkSQL 1.3.0 cannot read parquet files from different file system
Hi, I am using Spark 1.3.0, where I cannot load parquet files from more than one file system, say one s3n://... and another hdfs://..., which worked in older version, or if I set spark.sql.parquet.useDataSourceApi=false in 1.3. One way to fix this is instead of get a single FileSystem from default configuration in ParquetRelation2, call Path.getFileSystem for each path. Here's the JIRA link and pull request: https://issues.apache.org/jira/browse/SPARK-6351 https://github.com/apache/spark/pull/5039 Thanks, -- Pei-Lun
Re: Which OutputCommitter to use for S3?
Hi, I created a JIRA and PR for supporting a s3 friendly output committer for saveAsParquetFile: https://issues.apache.org/jira/browse/SPARK-6352 https://github.com/apache/spark/pull/5042 My approach is add a DirectParquetOutputCommitter class in spark-sql package and use a boolean config variable spark.sql.parquet.useDirectParquetOutputCommitter to choose between default output committer. This may not be the smartest solution but it works for me. Tested on spark 1.1, 1.3 with hadoop 1.0.4. On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson ilike...@gmail.com wrote: Yes, unfortunately that direct dependency makes this injection much more difficult for saveAsParquetFile. On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote: Thanks for the DirectOutputCommitter example. However I found it only works for saveAsHadoopFile. What about saveAsParquetFile? It looks like SparkSQL is using ParquetOutputCommitter, which is subclass of FileOutputCommitter. On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: FYI. We're currently addressing this at the Hadoop level in https://issues.apache.org/jira/browse/HADOOP-9565 Thomas Demoor On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: u...@spark.apache.org u...@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: u...@spark.apache.org u...@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com ; u...@spark.apache.org u...@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM
Re: SparkSQL 1.3.0 (RC3) failed to read parquet file generated by 1.1.1
Thanks! On Sat, Mar 14, 2015 at 3:31 AM, Michael Armbrust mich...@databricks.com wrote: Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-6315 On Thu, Mar 12, 2015 at 11:00 PM, Michael Armbrust mich...@databricks.com wrote: We are looking at the issue and will likely fix it for Spark 1.3.1. On Thu, Mar 12, 2015 at 8:25 PM, giive chen thegi...@gmail.com wrote: Hi all My team has the same issue. It looks like Spark 1.3's sparkSQL cannot read parquet file generated by Spark 1.1. It will cost a lot of migration work when we wanna to upgrade Spark 1.3. Is there anyone can help me? Thanks Wisely Chen On Tue, Mar 10, 2015 at 5:06 PM, Pei-Lun Lee pl...@appier.com wrote: Hi, I found that if I try to read parquet file generated by spark 1.1.1 using 1.3.0-rc3 by default settings, I got this error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'StructType': was expecting ('true', 'false' or 'null') at [Source: StructType(List(StructField(a,IntegerType,false))); line: 1, column: 11] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2300) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1459) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2161) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44) at org.apache.spark.sql.types.DataType$.fromJson(dataTypes.scala:41) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675) this is how I save parquet file with 1.1.1: sql(select 1 as a).saveAsParquetFile(/tmp/foo) and this is the meta data of the 1.1.1 parquet file: creator: parquet-mr version 1.4.3 extra: org.apache.spark.sql.parquet.row.metadata = StructType(List(StructField(a,IntegerType,false))) by comparison, this is 1.3.0 meta: creator: parquet-mr version 1.6.0rc3 extra: org.apache.spark.sql.parquet.row.metadata = {type:struct,fields:[{name:a,type:integer,nullable:t [more]... It looks like now ParquetRelation2 is used to load parquet file by default and it only recognizes JSON format schema but 1.1.1 schema was case class string format. Setting spark.sql.parquet.useDataSourceApi to false will fix it, but I don't know the differences. Is this considered a bug? We have a lot of parquet files from 1.1.1, should we disable data source api in order to read them if we want to upgrade to 1.3? Thanks, -- Pei-Lun
SparkSQL 1.3.0 (RC3) failed to read parquet file generated by 1.1.1
Hi, I found that if I try to read parquet file generated by spark 1.1.1 using 1.3.0-rc3 by default settings, I got this error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'StructType': was expecting ('true', 'false' or 'null') at [Source: StructType(List(StructField(a,IntegerType,false))); line: 1, column: 11] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2300) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1459) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2161) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44) at org.apache.spark.sql.types.DataType$.fromJson(dataTypes.scala:41) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675) this is how I save parquet file with 1.1.1: sql(select 1 as a).saveAsParquetFile(/tmp/foo) and this is the meta data of the 1.1.1 parquet file: creator: parquet-mr version 1.4.3 extra: org.apache.spark.sql.parquet.row.metadata = StructType(List(StructField(a,IntegerType,false))) by comparison, this is 1.3.0 meta: creator: parquet-mr version 1.6.0rc3 extra: org.apache.spark.sql.parquet.row.metadata = {type:struct,fields:[{name:a,type:integer,nullable:t [more]... It looks like now ParquetRelation2 is used to load parquet file by default and it only recognizes JSON format schema but 1.1.1 schema was case class string format. Setting spark.sql.parquet.useDataSourceApi to false will fix it, but I don't know the differences. Is this considered a bug? We have a lot of parquet files from 1.1.1, should we disable data source api in order to read them if we want to upgrade to 1.3? Thanks, -- Pei-Lun
Re: Which OutputCommitter to use for S3?
Thanks for the DirectOutputCommitter example. However I found it only works for saveAsHadoopFile. What about saveAsParquetFile? It looks like SparkSQL is using ParquetOutputCommitter, which is subclass of FileOutputCommitter. On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: FYI. We're currently addressing this at the Hadoop level in https://issues.apache.org/jira/browse/HADOOP-9565 Thomas Demoor On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: u...@spark.apache.org u...@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: u...@spark.apache.org u...@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; u...@spark.apache.org u...@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM Subject: Re: Which OutputCommitter to use for S3? Here is the class: https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e= You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider