SparkSQL 1.3.0 cannot read parquet files from different file system

2015-03-16 Thread Pei-Lun Lee
Hi,

I am using Spark 1.3.0, where I cannot load parquet files from more than
one file system, say one s3n://... and another hdfs://..., which worked in
older version, or if I set spark.sql.parquet.useDataSourceApi=false in 1.3.

One way to fix this is instead of get a single FileSystem from default
configuration in ParquetRelation2, call Path.getFileSystem for each path.

Here's the JIRA link and pull request:
https://issues.apache.org/jira/browse/SPARK-6351
https://github.com/apache/spark/pull/5039

Thanks,
--
Pei-Lun


Re: Which OutputCommitter to use for S3?

2015-03-16 Thread Pei-Lun Lee
Hi,

I created a JIRA and PR for supporting a s3 friendly output committer for
saveAsParquetFile:
https://issues.apache.org/jira/browse/SPARK-6352
https://github.com/apache/spark/pull/5042

My approach is add a DirectParquetOutputCommitter class in spark-sql
package and use a boolean config variable
spark.sql.parquet.useDirectParquetOutputCommitter to choose between default
output committer.
This may not be the smartest solution but it works for me.
Tested on spark 1.1, 1.3 with hadoop 1.0.4.


On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson ilike...@gmail.com wrote:

 Yes, unfortunately that direct dependency makes this injection much more
 difficult for saveAsParquetFile.

 On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote:

 Thanks for the DirectOutputCommitter example.
 However I found it only works for saveAsHadoopFile. What about
 saveAsParquetFile?
 It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
 of FileOutputCommitter.

 On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor 
 thomas.dem...@amplidata.com
 wrote:

  FYI. We're currently addressing this at the Hadoop level in
  https://issues.apache.org/jira/browse/HADOOP-9565
 
 
  Thomas Demoor
 
  On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath 
  ddmcbe...@yahoo.com.invalid wrote:
 
  Just to close the loop in case anyone runs into the same problem I had.
 
  By setting --hadoop-major-version=2 when using the ec2 scripts,
  everything worked fine.
 
  Darin.
 
 
  - Original Message -
  From: Darin McBeath ddmcbe...@yahoo.com.INVALID
  To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com
 
  Cc: u...@spark.apache.org u...@spark.apache.org
  Sent: Monday, February 23, 2015 3:16 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Thanks.  I think my problem might actually be the other way around.
 
  I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
  scripts, I don't specify a
  -hadoop-major-version and the default is 1.   I'm guessing that if I
 make
  that a 2 that it might work correctly.  I'll try it and post a
 response.
 
 
  - Original Message -
  From: Mingyu Kim m...@palantir.com
  To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson 
  ilike...@gmail.com
  Cc: u...@spark.apache.org u...@spark.apache.org
  Sent: Monday, February 23, 2015 3:06 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Cool, we will start from there. Thanks Aaron and Josh!
 
  Darin, it¹s likely because the DirectOutputCommitter is compiled with
  Hadoop 1 classes and you¹re running it with Hadoop 2.
  org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1,
 and it
  became an interface in Hadoop 2.
 
  Mingyu
 
 
 
 
 
  On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID
  wrote:
 
  Aaron.  Thanks for the class. Since I'm currently writing Java based
  Spark applications, I tried converting your class to Java (it seemed
  pretty straightforward).
  
  I set up the use of the class as follows:
  
  SparkConf conf = new SparkConf()
  .set(spark.hadoop.mapred.output.committer.class,
  com.elsevier.common.DirectOutputCommitter);
  
  And I then try and save a file to S3 (which I believe should use the
 old
  hadoop apis).
  
  JavaPairRDDText, Text newBaselineRDDWritable =
  reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
  newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
  Text.class, Text.class, SequenceFileOutputFormat.class,
  org.apache.hadoop.io.compress.GzipCodec.class);
  
  But, I get the following error message.
  
  Exception in thread main java.lang.IncompatibleClassChangeError:
 Found
  class org.apache.hadoop.mapred.JobContext, but interface was expected
  at
 
 
 com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
  java:68)
  at
 
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
  .scala:1075)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:940)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:902)
  at
 
 
 org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
  71)
  at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
  
  In my class, JobContext is an interface of  type
  org.apache.hadoop.mapred.JobContext.
  
  Is there something obvious that I might be doing wrong (or messed up
 in
  the translation from Scala to Java) or something I should look into?
 I'm
  using Spark 1.2 with hadoop 2.4.
  
  
  Thanks.
  
  Darin.
  
  
  
  
  
  From: Aaron Davidson ilike...@gmail.com
  To: Andrew Ash and...@andrewash.com
  Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com
 ;
  u...@spark.apache.org u...@spark.apache.org; Aaron Davidson
  aa...@databricks.com
  Sent: Saturday, February 21, 2015 7:01 PM

Re: SparkSQL 1.3.0 (RC3) failed to read parquet file generated by 1.1.1

2015-03-15 Thread Pei-Lun Lee
Thanks!

On Sat, Mar 14, 2015 at 3:31 AM, Michael Armbrust mich...@databricks.com
wrote:

 Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-6315

 On Thu, Mar 12, 2015 at 11:00 PM, Michael Armbrust mich...@databricks.com
 
 wrote:

  We are looking at the issue and will likely fix it for Spark 1.3.1.
 
  On Thu, Mar 12, 2015 at 8:25 PM, giive chen thegi...@gmail.com wrote:
 
  Hi all
 
  My team has the same issue. It looks like Spark 1.3's sparkSQL cannot
 read
  parquet file generated by Spark 1.1. It will cost a lot of migration
 work
  when we wanna to upgrade Spark 1.3.
 
  Is there  anyone can help me?
 
 
  Thanks
 
  Wisely Chen
 
 
  On Tue, Mar 10, 2015 at 5:06 PM, Pei-Lun Lee pl...@appier.com wrote:
 
   Hi,
  
   I found that if I try to read parquet file generated by spark 1.1.1
  using
   1.3.0-rc3 by default settings, I got this error:
  
   com.fasterxml.jackson.core.JsonParseException: Unrecognized token
   'StructType': was expecting ('true', 'false' or 'null')
at [Source: StructType(List(StructField(a,IntegerType,false))); line:
  1,
   column: 11]
   at
  
 
 com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419)
   at
  
  
 
 com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508)
   at
  
  
 
 com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2300)
   at
  
  
 
 com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1459)
   at
  
  
 
 com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683)
   at
  
  
 
 com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105)
   at
  
  
 
 com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051)
   at
  
  
 
 com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2161)
   at
  org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19)
   at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44)
   at
   org.apache.spark.sql.types.DataType$.fromJson(dataTypes.scala:41)
   at
  
  
 
 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675)
   at
  
  
 
 org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675)
  
  
  
   this is how I save parquet file with 1.1.1:
  
   sql(select 1 as a).saveAsParquetFile(/tmp/foo)
  
  
  
   and this is the meta data of the 1.1.1 parquet file:
  
   creator: parquet-mr version 1.4.3
   extra:   org.apache.spark.sql.parquet.row.metadata =
   StructType(List(StructField(a,IntegerType,false)))
  
  
  
   by comparison, this is 1.3.0 meta:
  
   creator: parquet-mr version 1.6.0rc3
   extra:   org.apache.spark.sql.parquet.row.metadata =
   {type:struct,fields:[{name:a,type:integer,nullable:t
   [more]...
  
  
  
   It looks like now ParquetRelation2 is used to load parquet file by
  default
   and it only recognizes JSON format schema but 1.1.1 schema was case
  class
   string format.
  
   Setting spark.sql.parquet.useDataSourceApi to false will fix it, but I
   don't know the differences.
   Is this considered a bug? We have a lot of parquet files from 1.1.1,
  should
   we disable data source api in order to read them if we want to upgrade
  to
   1.3?
  
   Thanks,
   --
   Pei-Lun
  
 
 
 



SparkSQL 1.3.0 (RC3) failed to read parquet file generated by 1.1.1

2015-03-10 Thread Pei-Lun Lee
Hi,

I found that if I try to read parquet file generated by spark 1.1.1 using
1.3.0-rc3 by default settings, I got this error:

com.fasterxml.jackson.core.JsonParseException: Unrecognized token
'StructType': was expecting ('true', 'false' or 'null')
 at [Source: StructType(List(StructField(a,IntegerType,false))); line: 1,
column: 11]
at
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1419)
at
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:508)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2300)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1459)
at
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:683)
at
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3105)
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3051)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2161)
at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:19)
at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:44)
at org.apache.spark.sql.types.DataType$.fromJson(dataTypes.scala:41)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$readSchema$1$$anonfun$25.apply(newParquet.scala:675)



this is how I save parquet file with 1.1.1:

sql(select 1 as a).saveAsParquetFile(/tmp/foo)



and this is the meta data of the 1.1.1 parquet file:

creator: parquet-mr version 1.4.3
extra:   org.apache.spark.sql.parquet.row.metadata =
StructType(List(StructField(a,IntegerType,false)))



by comparison, this is 1.3.0 meta:

creator: parquet-mr version 1.6.0rc3
extra:   org.apache.spark.sql.parquet.row.metadata =
{type:struct,fields:[{name:a,type:integer,nullable:t
[more]...



It looks like now ParquetRelation2 is used to load parquet file by default
and it only recognizes JSON format schema but 1.1.1 schema was case class
string format.

Setting spark.sql.parquet.useDataSourceApi to false will fix it, but I
don't know the differences.
Is this considered a bug? We have a lot of parquet files from 1.1.1, should
we disable data source api in order to read them if we want to upgrade to
1.3?

Thanks,
--
Pei-Lun


Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Pei-Lun Lee
Thanks for the DirectOutputCommitter example.
However I found it only works for saveAsHadoopFile. What about
saveAsParquetFile?
It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
of FileOutputCommitter.

On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com
wrote:

 FYI. We're currently addressing this at the Hadoop level in
 https://issues.apache.org/jira/browse/HADOOP-9565


 Thomas Demoor

 On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath 
 ddmcbe...@yahoo.com.invalid wrote:

 Just to close the loop in case anyone runs into the same problem I had.

 By setting --hadoop-major-version=2 when using the ec2 scripts,
 everything worked fine.

 Darin.


 - Original Message -
 From: Darin McBeath ddmcbe...@yahoo.com.INVALID
 To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com
 Cc: u...@spark.apache.org u...@spark.apache.org
 Sent: Monday, February 23, 2015 3:16 PM
 Subject: Re: Which OutputCommitter to use for S3?

 Thanks.  I think my problem might actually be the other way around.

 I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
 scripts, I don't specify a
 -hadoop-major-version and the default is 1.   I'm guessing that if I make
 that a 2 that it might work correctly.  I'll try it and post a response.


 - Original Message -
 From: Mingyu Kim m...@palantir.com
 To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson 
 ilike...@gmail.com
 Cc: u...@spark.apache.org u...@spark.apache.org
 Sent: Monday, February 23, 2015 3:06 PM
 Subject: Re: Which OutputCommitter to use for S3?

 Cool, we will start from there. Thanks Aaron and Josh!

 Darin, it¹s likely because the DirectOutputCommitter is compiled with
 Hadoop 1 classes and you¹re running it with Hadoop 2.
 org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
 became an interface in Hadoop 2.

 Mingyu





 On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID
 wrote:

 Aaron.  Thanks for the class. Since I'm currently writing Java based
 Spark applications, I tried converting your class to Java (it seemed
 pretty straightforward).
 
 I set up the use of the class as follows:
 
 SparkConf conf = new SparkConf()
 .set(spark.hadoop.mapred.output.committer.class,
 com.elsevier.common.DirectOutputCommitter);
 
 And I then try and save a file to S3 (which I believe should use the old
 hadoop apis).
 
 JavaPairRDDText, Text newBaselineRDDWritable =
 reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
 newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
 Text.class, Text.class, SequenceFileOutputFormat.class,
 org.apache.hadoop.io.compress.GzipCodec.class);
 
 But, I get the following error message.
 
 Exception in thread main java.lang.IncompatibleClassChangeError: Found
 class org.apache.hadoop.mapred.JobContext, but interface was expected
 at

 com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
 java:68)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
 .scala:1075)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
 ala:940)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
 ala:902)
 at

 org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
 71)
 at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
 
 In my class, JobContext is an interface of  type
 org.apache.hadoop.mapred.JobContext.
 
 Is there something obvious that I might be doing wrong (or messed up in
 the translation from Scala to Java) or something I should look into?  I'm
 using Spark 1.2 with hadoop 2.4.
 
 
 Thanks.
 
 Darin.
 
 
 
 
 
 From: Aaron Davidson ilike...@gmail.com
 To: Andrew Ash and...@andrewash.com
 Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com;
 u...@spark.apache.org u...@spark.apache.org; Aaron Davidson
 aa...@databricks.com
 Sent: Saturday, February 21, 2015 7:01 PM
 Subject: Re: Which OutputCommitter to use for S3?
 
 
 
 Here is the class:
 
 https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron

 dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o

 Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs
 zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e=
 
 You can use it by setting mapred.output.committer.class in the Hadoop
 configuration (or spark.hadoop.mapred.output.committer.class in the
 Spark configuration). Note that this only works for the old Hadoop APIs,
 I believe the new Hadoop APIs strongly tie committer to input format (so
 FileInputFormat always uses FileOutputCommitter), which makes this fix
 more difficult to apply.
 
 
 
 
 On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com
 wrote:
 
 Josh is that class something you guys would consider