Re: SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-27 Thread Pei-Lun Lee
I'm using 1.0.4

Thanks,
--
Pei-Lun

On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hm, which version of Hadoop are you using? Actually there should also be
 a _metadata file together with _common_metadata. I was using Hadoop 2.4.1
 btw. I'm not sure whether Hadoop version matters here, but I did observe
 cases where Spark behaves differently because of semantic differences of
 the same API in different Hadoop versions.

 Cheng

 On 3/27/15 11:33 AM, Pei-Lun Lee wrote:

 Hi Cheng,

  on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode.
 Overwrite) produces:

  peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
 total 32
 -rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
 -rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

  while res0.save(xxx) produces:

  peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
 total 40
 -rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
 -rwxrwxrwx  1 peilunlee  staff  250 Mar 27 11:29 _common_metadata*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
 -rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

 On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote:

  I couldn’t reproduce this with the following spark-shell snippet:

 scala import sqlContext.implicits._
 scala Seq((1, 2)).toDF(a, b)
 scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite)
 scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite)

 The _common_metadata file is typically much smaller than _metadata,
 because it doesn’t contain row group information, and thus can be faster to
 read than _metadata.

 Cheng

 On 3/26/15 12:48 PM, Pei-Lun Lee wrote:

 Hi,

  When I save parquet file with SaveMode.Overwrite, it never generate
 _common_metadata. Whether it overwrites an existing dir or not.
 Is this expected behavior?
 And what is the benefit of _common_metadata? Will reading performs better
 when it is present?

  Thanks,
 --
 Pei-Lun

  ​






Re: SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-27 Thread Pei-Lun Lee
JIRA ticket created at:
https://issues.apache.org/jira/browse/SPARK-6581

Thanks,
--
Pei-Lun

On Fri, Mar 27, 2015 at 7:03 PM, Cheng Lian lian.cs@gmail.com wrote:

  Thanks for the information. Verified that the _common_metadata and
 _metadata file are missing in this case when using Hadoop 1.0.4. Would you
 mind to open a JIRA for this?

 Cheng

 On 3/27/15 2:40 PM, Pei-Lun Lee wrote:

 I'm using 1.0.4

  Thanks,
 --
 Pei-Lun

 On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hm, which version of Hadoop are you using? Actually there should also
 be a _metadata file together with _common_metadata. I was using Hadoop
 2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did
 observe cases where Spark behaves differently because of semantic
 differences of the same API in different Hadoop versions.

 Cheng

 On 3/27/15 11:33 AM, Pei-Lun Lee wrote:

 Hi Cheng,

  on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode.
 Overwrite) produces:

  peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
 total 32
 -rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
 -rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

  while res0.save(xxx) produces:

  peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
 total 40
 -rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
 -rwxrwxrwx  1 peilunlee  staff  250 Mar 27 11:29 _common_metadata*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
 -rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
 -rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

 On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  I couldn’t reproduce this with the following spark-shell snippet:

 scala import sqlContext.implicits._
 scala Seq((1, 2)).toDF(a, b)
 scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite)
 scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite)

 The _common_metadata file is typically much smaller than _metadata,
 because it doesn’t contain row group information, and thus can be faster to
 read than _metadata.

 Cheng

 On 3/26/15 12:48 PM, Pei-Lun Lee wrote:

 Hi,

  When I save parquet file with SaveMode.Overwrite, it never generate
 _common_metadata. Whether it overwrites an existing dir or not.
 Is this expected behavior?
 And what is the benefit of _common_metadata? Will reading performs
 better when it is present?

  Thanks,
 --
 Pei-Lun

  ​








Re: SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-26 Thread Pei-Lun Lee
Hi Cheng,

on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode.
Overwrite) produces:

peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
total 32
-rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
-rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

while res0.save(xxx) produces:

peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx
total 40
-rwxrwxrwx  1 peilunlee  staff0 Mar 27 11:29 _SUCCESS*
-rwxrwxrwx  1 peilunlee  staff  250 Mar 27 11:29 _common_metadata*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-1.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-2.parquet*
-rwxrwxrwx  1 peilunlee  staff  272 Mar 27 11:29 part-r-3.parquet*
-rwxrwxrwx  1 peilunlee  staff  488 Mar 27 11:29 part-r-4.parquet*

On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote:

  I couldn’t reproduce this with the following spark-shell snippet:

 scala import sqlContext.implicits._
 scala Seq((1, 2)).toDF(a, b)
 scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite)
 scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite)

 The _common_metadata file is typically much smaller than _metadata,
 because it doesn’t contain row group information, and thus can be faster to
 read than _metadata.

 Cheng

 On 3/26/15 12:48 PM, Pei-Lun Lee wrote:

   Hi,

  When I save parquet file with SaveMode.Overwrite, it never generate
 _common_metadata. Whether it overwrites an existing dir or not.
 Is this expected behavior?
 And what is the benefit of _common_metadata? Will reading performs better
 when it is present?

  Thanks,
 --
 Pei-Lun

   ​



Re: Which OutputCommitter to use for S3?

2015-03-25 Thread Pei-Lun Lee
I updated the PR for SPARK-6352 to be more like SPARK-3595.
I added a new setting spark.sql.parquet.output.committer.class in hadoop
configuration to allow custom implementation of ParquetOutputCommitter.
Can someone take a look at the PR?

On Mon, Mar 16, 2015 at 5:23 PM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I created a JIRA and PR for supporting a s3 friendly output committer for
 saveAsParquetFile:
 https://issues.apache.org/jira/browse/SPARK-6352
 https://github.com/apache/spark/pull/5042

 My approach is add a DirectParquetOutputCommitter class in spark-sql
 package and use a boolean config variable
 spark.sql.parquet.useDirectParquetOutputCommitter to choose between default
 output committer.
 This may not be the smartest solution but it works for me.
 Tested on spark 1.1, 1.3 with hadoop 1.0.4.


 On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson ilike...@gmail.com wrote:

 Yes, unfortunately that direct dependency makes this injection much more
 difficult for saveAsParquetFile.

 On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote:

 Thanks for the DirectOutputCommitter example.
 However I found it only works for saveAsHadoopFile. What about
 saveAsParquetFile?
 It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
 of FileOutputCommitter.

 On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor 
 thomas.dem...@amplidata.com
 wrote:

  FYI. We're currently addressing this at the Hadoop level in
  https://issues.apache.org/jira/browse/HADOOP-9565
 
 
  Thomas Demoor
 
  On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath 
  ddmcbe...@yahoo.com.invalid wrote:
 
  Just to close the loop in case anyone runs into the same problem I
 had.
 
  By setting --hadoop-major-version=2 when using the ec2 scripts,
  everything worked fine.
 
  Darin.
 
 
  - Original Message -
  From: Darin McBeath ddmcbe...@yahoo.com.INVALID
  To: Mingyu Kim m...@palantir.com; Aaron Davidson 
 ilike...@gmail.com
  Cc: user@spark.apache.org user@spark.apache.org
  Sent: Monday, February 23, 2015 3:16 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Thanks.  I think my problem might actually be the other way around.
 
  I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
  scripts, I don't specify a
  -hadoop-major-version and the default is 1.   I'm guessing that if I
 make
  that a 2 that it might work correctly.  I'll try it and post a
 response.
 
 
  - Original Message -
  From: Mingyu Kim m...@palantir.com
  To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson 
  ilike...@gmail.com
  Cc: user@spark.apache.org user@spark.apache.org
  Sent: Monday, February 23, 2015 3:06 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Cool, we will start from there. Thanks Aaron and Josh!
 
  Darin, it¹s likely because the DirectOutputCommitter is compiled with
  Hadoop 1 classes and you¹re running it with Hadoop 2.
  org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1,
 and it
  became an interface in Hadoop 2.
 
  Mingyu
 
 
 
 
 
  On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID
  wrote:
 
  Aaron.  Thanks for the class. Since I'm currently writing Java based
  Spark applications, I tried converting your class to Java (it seemed
  pretty straightforward).
  
  I set up the use of the class as follows:
  
  SparkConf conf = new SparkConf()
  .set(spark.hadoop.mapred.output.committer.class,
  com.elsevier.common.DirectOutputCommitter);
  
  And I then try and save a file to S3 (which I believe should use the
 old
  hadoop apis).
  
  JavaPairRDDText, Text newBaselineRDDWritable =
  reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
  newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
  Text.class, Text.class, SequenceFileOutputFormat.class,
  org.apache.hadoop.io.compress.GzipCodec.class);
  
  But, I get the following error message.
  
  Exception in thread main java.lang.IncompatibleClassChangeError:
 Found
  class org.apache.hadoop.mapred.JobContext, but interface was expected
  at
 
 
 com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
  java:68)
  at
 
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
  .scala:1075)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:940)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:902)
  at
 
 
 org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
  71)
  at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
  
  In my class, JobContext is an interface of  type
  org.apache.hadoop.mapred.JobContext.
  
  Is there something obvious that I might be doing wrong (or messed up
 in
  the translation from Scala to Java) or something I should look
 into?  I'm
  using Spark 1.2 with hadoop 2.4.
  
  
  Thanks.
  
  Darin

SparkSQL overwrite parquet file does not generate _common_metadata

2015-03-25 Thread Pei-Lun Lee
Hi,

When I save parquet file with SaveMode.Overwrite, it never generate
_common_metadata. Whether it overwrites an existing dir or not.
Is this expected behavior?
And what is the benefit of _common_metadata? Will reading performs better
when it is present?

Thanks,
--
Pei-Lun


Re: SparkSQL 1.3.0 JDBC data source issues

2015-03-19 Thread Pei-Lun Lee
JIRA and PR for first issue:
https://issues.apache.org/jira/browse/SPARK-6408
https://github.com/apache/spark/pull/5087

On Thu, Mar 19, 2015 at 12:20 PM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am trying jdbc data source in spark sql 1.3.0 and found some issues.

 First, the syntax where str_col='value' will give error for both
 postgresql and mysql:

 psql create table foo(id int primary key,name text,age int);
 bash SPARK_CLASSPATH=postgresql-9.4-1201-jdbc41.jar spark/bin/spark-shell
 scala
 sqlContext.load(jdbc,Map(url-jdbc:postgresql://XXX,dbtable-foo)).registerTempTable(foo)
 scala sql(select * from foo where name='bar').collect
 org.postgresql.util.PSQLException: ERROR: operator does not exist: text =
 bar
   Hint: No operator matches the given name and argument type(s). You might
 need to add explicit type casts.
   Position: 40
 scala sql(select * from foo where name like '%foo').collect

 bash SPARK_CLASSPATH=mysql-connector-java-5.1.34.jar spark/bin/spark-shell
 scala
 sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo)
 scala sql(select * from foo where name='bar').collect
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column
 'bar' in 'where clause'



 Second, postgresql table with json data type does not work:

 psql create table foo(id int primary key, data json);
 scala
 sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo)
 java.sql.SQLException: Unsupported type 



 Not sure these are bug in spark sql or jdbc. I can file JIRA ticket if
 needed.

 Thanks,
 --
 Pei-Lun




SparkSQL 1.3.0 JDBC data source issues

2015-03-18 Thread Pei-Lun Lee
Hi,

I am trying jdbc data source in spark sql 1.3.0 and found some issues.

First, the syntax where str_col='value' will give error for both
postgresql and mysql:

psql create table foo(id int primary key,name text,age int);
bash SPARK_CLASSPATH=postgresql-9.4-1201-jdbc41.jar spark/bin/spark-shell
scala
sqlContext.load(jdbc,Map(url-jdbc:postgresql://XXX,dbtable-foo)).registerTempTable(foo)
scala sql(select * from foo where name='bar').collect
org.postgresql.util.PSQLException: ERROR: operator does not exist: text =
bar
  Hint: No operator matches the given name and argument type(s). You might
need to add explicit type casts.
  Position: 40
scala sql(select * from foo where name like '%foo').collect

bash SPARK_CLASSPATH=mysql-connector-java-5.1.34.jar spark/bin/spark-shell
scala
sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo)
scala sql(select * from foo where name='bar').collect
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column
'bar' in 'where clause'



Second, postgresql table with json data type does not work:

psql create table foo(id int primary key, data json);
scala
sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo)
java.sql.SQLException: Unsupported type 



Not sure these are bug in spark sql or jdbc. I can file JIRA ticket if
needed.

Thanks,
--
Pei-Lun


Re: Which OutputCommitter to use for S3?

2015-03-16 Thread Pei-Lun Lee
Hi,

I created a JIRA and PR for supporting a s3 friendly output committer for
saveAsParquetFile:
https://issues.apache.org/jira/browse/SPARK-6352
https://github.com/apache/spark/pull/5042

My approach is add a DirectParquetOutputCommitter class in spark-sql
package and use a boolean config variable
spark.sql.parquet.useDirectParquetOutputCommitter to choose between default
output committer.
This may not be the smartest solution but it works for me.
Tested on spark 1.1, 1.3 with hadoop 1.0.4.


On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson ilike...@gmail.com wrote:

 Yes, unfortunately that direct dependency makes this injection much more
 difficult for saveAsParquetFile.

 On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote:

 Thanks for the DirectOutputCommitter example.
 However I found it only works for saveAsHadoopFile. What about
 saveAsParquetFile?
 It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
 of FileOutputCommitter.

 On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor 
 thomas.dem...@amplidata.com
 wrote:

  FYI. We're currently addressing this at the Hadoop level in
  https://issues.apache.org/jira/browse/HADOOP-9565
 
 
  Thomas Demoor
 
  On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath 
  ddmcbe...@yahoo.com.invalid wrote:
 
  Just to close the loop in case anyone runs into the same problem I had.
 
  By setting --hadoop-major-version=2 when using the ec2 scripts,
  everything worked fine.
 
  Darin.
 
 
  - Original Message -
  From: Darin McBeath ddmcbe...@yahoo.com.INVALID
  To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com
 
  Cc: user@spark.apache.org user@spark.apache.org
  Sent: Monday, February 23, 2015 3:16 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Thanks.  I think my problem might actually be the other way around.
 
  I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
  scripts, I don't specify a
  -hadoop-major-version and the default is 1.   I'm guessing that if I
 make
  that a 2 that it might work correctly.  I'll try it and post a
 response.
 
 
  - Original Message -
  From: Mingyu Kim m...@palantir.com
  To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson 
  ilike...@gmail.com
  Cc: user@spark.apache.org user@spark.apache.org
  Sent: Monday, February 23, 2015 3:06 PM
  Subject: Re: Which OutputCommitter to use for S3?
 
  Cool, we will start from there. Thanks Aaron and Josh!
 
  Darin, it¹s likely because the DirectOutputCommitter is compiled with
  Hadoop 1 classes and you¹re running it with Hadoop 2.
  org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1,
 and it
  became an interface in Hadoop 2.
 
  Mingyu
 
 
 
 
 
  On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID
  wrote:
 
  Aaron.  Thanks for the class. Since I'm currently writing Java based
  Spark applications, I tried converting your class to Java (it seemed
  pretty straightforward).
  
  I set up the use of the class as follows:
  
  SparkConf conf = new SparkConf()
  .set(spark.hadoop.mapred.output.committer.class,
  com.elsevier.common.DirectOutputCommitter);
  
  And I then try and save a file to S3 (which I believe should use the
 old
  hadoop apis).
  
  JavaPairRDDText, Text newBaselineRDDWritable =
  reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
  newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
  Text.class, Text.class, SequenceFileOutputFormat.class,
  org.apache.hadoop.io.compress.GzipCodec.class);
  
  But, I get the following error message.
  
  Exception in thread main java.lang.IncompatibleClassChangeError:
 Found
  class org.apache.hadoop.mapred.JobContext, but interface was expected
  at
 
 
 com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
  java:68)
  at
 
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
  .scala:1075)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:940)
  at
 
 
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
  ala:902)
  at
 
 
 org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
  71)
  at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
  
  In my class, JobContext is an interface of  type
  org.apache.hadoop.mapred.JobContext.
  
  Is there something obvious that I might be doing wrong (or messed up
 in
  the translation from Scala to Java) or something I should look into?
 I'm
  using Spark 1.2 with hadoop 2.4.
  
  
  Thanks.
  
  Darin.
  
  
  
  
  
  From: Aaron Davidson ilike...@gmail.com
  To: Andrew Ash and...@andrewash.com
  Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com
 ;
  user@spark.apache.org user@spark.apache.org; Aaron Davidson
  aa...@databricks.com
  Sent: Saturday, February 21, 2015 7:01 PM

Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Pei-Lun Lee
Thanks for the DirectOutputCommitter example.
However I found it only works for saveAsHadoopFile. What about
saveAsParquetFile?
It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
of FileOutputCommitter.

On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com
wrote:

 FYI. We're currently addressing this at the Hadoop level in
 https://issues.apache.org/jira/browse/HADOOP-9565


 Thomas Demoor

 On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath 
 ddmcbe...@yahoo.com.invalid wrote:

 Just to close the loop in case anyone runs into the same problem I had.

 By setting --hadoop-major-version=2 when using the ec2 scripts,
 everything worked fine.

 Darin.


 - Original Message -
 From: Darin McBeath ddmcbe...@yahoo.com.INVALID
 To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com
 Cc: user@spark.apache.org user@spark.apache.org
 Sent: Monday, February 23, 2015 3:16 PM
 Subject: Re: Which OutputCommitter to use for S3?

 Thanks.  I think my problem might actually be the other way around.

 I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
 scripts, I don't specify a
 -hadoop-major-version and the default is 1.   I'm guessing that if I make
 that a 2 that it might work correctly.  I'll try it and post a response.


 - Original Message -
 From: Mingyu Kim m...@palantir.com
 To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson 
 ilike...@gmail.com
 Cc: user@spark.apache.org user@spark.apache.org
 Sent: Monday, February 23, 2015 3:06 PM
 Subject: Re: Which OutputCommitter to use for S3?

 Cool, we will start from there. Thanks Aaron and Josh!

 Darin, it¹s likely because the DirectOutputCommitter is compiled with
 Hadoop 1 classes and you¹re running it with Hadoop 2.
 org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
 became an interface in Hadoop 2.

 Mingyu





 On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID
 wrote:

 Aaron.  Thanks for the class. Since I'm currently writing Java based
 Spark applications, I tried converting your class to Java (it seemed
 pretty straightforward).
 
 I set up the use of the class as follows:
 
 SparkConf conf = new SparkConf()
 .set(spark.hadoop.mapred.output.committer.class,
 com.elsevier.common.DirectOutputCommitter);
 
 And I then try and save a file to S3 (which I believe should use the old
 hadoop apis).
 
 JavaPairRDDText, Text newBaselineRDDWritable =
 reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
 newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
 Text.class, Text.class, SequenceFileOutputFormat.class,
 org.apache.hadoop.io.compress.GzipCodec.class);
 
 But, I get the following error message.
 
 Exception in thread main java.lang.IncompatibleClassChangeError: Found
 class org.apache.hadoop.mapred.JobContext, but interface was expected
 at

 com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
 java:68)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
 .scala:1075)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
 ala:940)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
 ala:902)
 at

 org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
 71)
 at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
 
 In my class, JobContext is an interface of  type
 org.apache.hadoop.mapred.JobContext.
 
 Is there something obvious that I might be doing wrong (or messed up in
 the translation from Scala to Java) or something I should look into?  I'm
 using Spark 1.2 with hadoop 2.4.
 
 
 Thanks.
 
 Darin.
 
 
 
 
 
 From: Aaron Davidson ilike...@gmail.com
 To: Andrew Ash and...@andrewash.com
 Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com;
 user@spark.apache.org user@spark.apache.org; Aaron Davidson
 aa...@databricks.com
 Sent: Saturday, February 21, 2015 7:01 PM
 Subject: Re: Which OutputCommitter to use for S3?
 
 
 
 Here is the class:
 
 https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron

 dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o

 Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs
 zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e=
 
 You can use it by setting mapred.output.committer.class in the Hadoop
 configuration (or spark.hadoop.mapred.output.committer.class in the
 Spark configuration). Note that this only works for the old Hadoop APIs,
 I believe the new Hadoop APIs strongly tie committer to input format (so
 FileInputFormat always uses FileOutputCommitter), which makes this fix
 more difficult to apply.
 
 
 
 
 On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com
 wrote:
 
 Josh is that class something you guys would consider 

Re: Spark SQL - custom aggregation function (UDAF)

2014-10-14 Thread Pei-Lun Lee
I created https://issues.apache.org/jira/browse/SPARK-3947

On Tue, Oct 14, 2014 at 3:54 AM, Michael Armbrust mich...@databricks.com
wrote:

 Its not on the roadmap for 1.2.  I'd suggest opening a JIRA.

 On Mon, Oct 13, 2014 at 4:28 AM, Pierre B 
 pierre.borckm...@realimpactanalytics.com wrote:

 Is it planned in a near future ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-custom-aggregation-function-UDAF-tp15784p16275.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark sql union all is slow

2014-10-14 Thread Pei-Lun Lee
Hi,

You can merge them into one table by:

sqlContext.unionAll(sqlContext.unionAll(sqlContext.table(table_1),
sqlContext.table(table_2)),
sqlContext.table(table_3)).registarTempTable(table_all)

Or load them in one call by:

sqlContext.parquetFile(table_1.parquet,table_2.parquet,table_3.parquet).registerTempTable(table_all)

On Wed, Oct 15, 2014 at 2:51 AM, shuluster s...@turn.com wrote:

 I have many tables of same schema, they are partitioned by time. For
 example
 one id could be in many of those table. I would like to find aggregation of
 such ids. Originally these tables are located on HDFS as files. Once table
 schemaRDD is loaded, I cacheTable on them. Each table is around 30m - 100m
 serialized data

 The SQL I composed looks like the following:

 Select id, sum(cost) as cost from (

 (((select id, sum(cost) as cost  from table_1
 where id  = 1 group by id )
 union all
 (select id, sum(cost) as cost  from table_2
 where id  = 1 group by id ))
 union all
 (select id, sum(cost) as cost  from table_3
 where id  = 1 group by id )) as temp_table

 group by id


 The call to sparkSqlContext.sql() takes a long time to return a schemaRDD,
 the execution of collect of this RDD was not too slow.

 IS there something I am doing wrong here? Or Any tips on how to debug?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-union-all-is-slow-tp16407.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark sql left join gives KryoException: Buffer overflow

2014-07-21 Thread Pei-Lun Lee
Hi Michael,

Thanks for the suggestion. In my query, both table are too large to use
broadcast join.

When SPARK-2211 is done, will spark sql automatically choose join
algorithms?
Is there some way to manually hint the optimizer?


2014-07-19 5:23 GMT+08:00 Michael Armbrust mich...@databricks.com:

 Unfortunately, this is a query where we just don't have an efficiently
 implementation yet.  You might try switching the table order.

 Here is the JIRA for doing something more efficient:
 https://issues.apache.org/jira/browse/SPARK-2212


 On Fri, Jul 18, 2014 at 7:05 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 We have a query with left joining and got this error:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
 in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal:
 com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
 required: 1

 Looks like spark sql tried to do a broadcast join and collecting one of
 the table to master but it is too large.

 How do we explicitly control the join behavior like this?

 --
 Pei-Lun Lee





spark sql left join gives KryoException: Buffer overflow

2014-07-18 Thread Pei-Lun Lee
Hi,

We have a query with left joining and got this error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal:
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 1

Looks like spark sql tried to do a broadcast join and collecting one of the
table to master but it is too large.

How do we explicitly control the join behavior like this?

--
Pei-Lun Lee


Re: Spark SQL 1.0.1 error on reading fixed length byte array

2014-07-15 Thread Pei-Lun Lee
Hi Michael,

Good to know it is being handled. I tried master branch (9fe693b5) and got
another error:

scala sqlContext.parquetFile(/tmp/foo)
java.lang.RuntimeException: Unsupported parquet datatype optional
fixed_len_byte_array(4) b
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279)
..

The avro schema I used is something like:

protocol Test {
fixed Bytes4(4);

record User {
string name;
int age;
union {null, int} i;
union {null, int} j;
union {null, Bytes4} b;
union {null, bytes} c;
union {null, int} d;
}
}

Is this case included in SPARK-2446
https://issues.apache.org/jira/browse/SPARK-2446?


2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com:

 This is not supported yet, but there is a PR open to fix it:
 https://issues.apache.org/jira/browse/SPARK-2446


 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark-sql 1.0.1 to load parquet files generated from method
 described in:

 https://gist.github.com/massie/7224868


 When I try to submit a select query with columns of type fixed length
 byte array, the following error pops up:


 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at
 basicOperators.scala:100
 org.apache.spark.SparkDriverExecutionException: Execution error
 at
 org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581)
 at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)
 Caused by: parquet.io.ParquetDecodingException: Can not read value at 0
 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
 at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at
 org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
 ... 1 more
 Caused by: java.lang.ClassCastException: Expected instance of primitive
 converter but got
 org.apache.spark.sql.parquet.CatalystNativeArrayConverter
 at
 parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264)
 at
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
 at
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 ... 24 more


 Is fixed length byte array supposed to work in this version? I noticed
 that other array types like int or string already work.

 Thanks,
 --
 Pei-Lun





Re: Spark SQL 1.0.1 error on reading fixed length byte array

2014-07-15 Thread Pei-Lun Lee
Filed SPARK-2446



2014-07-15 16:17 GMT+08:00 Michael Armbrust mich...@databricks.com:

 Oh, maybe not.  Please file another JIRA.


 On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi Michael,

 Good to know it is being handled. I tried master branch (9fe693b5) and
 got another error:

 scala sqlContext.parquetFile(/tmp/foo)
 java.lang.RuntimeException: Unsupported parquet datatype optional
 fixed_len_byte_array(4) b
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58)
  at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109)
 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282)
  at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279)
 ..

 The avro schema I used is something like:

 protocol Test {
 fixed Bytes4(4);

 record User {
 string name;
 int age;
 union {null, int} i;
 union {null, int} j;
 union {null, Bytes4} b;
 union {null, bytes} c;
 union {null, int} d;
 }
 }

 Is this case included in SPARK-2446
 https://issues.apache.org/jira/browse/SPARK-2446?


 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com:

 This is not supported yet, but there is a PR open to fix it:
 https://issues.apache.org/jira/browse/SPARK-2446


 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark-sql 1.0.1 to load parquet files generated from method
 described in:

 https://gist.github.com/massie/7224868


 When I try to submit a select query with columns of type fixed length
 byte array, the following error pops up:


 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at
 basicOperators.scala:100
 org.apache.spark.SparkDriverExecutionException: Execution error
 at
 org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581)
 at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)
 Caused by: parquet.io.ParquetDecodingException: Can not read value at 0
 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
 at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at
 org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
 ... 1 more
 Caused by: java.lang.ClassCastException: Expected instance of primitive
 converter but got
 org.apache.spark.sql.parquet.CatalystNativeArrayConverter
 at
 parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264)
 at
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
 at
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 ... 24 more


 Is fixed length byte array

Re: Spark SQL 1.0.1 error on reading fixed length byte array

2014-07-15 Thread Pei-Lun Lee
Sorry, should be SPARK-2489


2014-07-15 19:22 GMT+08:00 Pei-Lun Lee pl...@appier.com:

 Filed SPARK-2446



 2014-07-15 16:17 GMT+08:00 Michael Armbrust mich...@databricks.com:

 Oh, maybe not.  Please file another JIRA.


 On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi Michael,

 Good to know it is being handled. I tried master branch (9fe693b5) and
 got another error:

 scala sqlContext.parquetFile(/tmp/foo)
 java.lang.RuntimeException: Unsupported parquet datatype optional
 fixed_len_byte_array(4) b
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58)
  at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109)
 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282)
  at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279)
 ..

 The avro schema I used is something like:

 protocol Test {
 fixed Bytes4(4);

 record User {
 string name;
 int age;
 union {null, int} i;
 union {null, int} j;
 union {null, Bytes4} b;
 union {null, bytes} c;
 union {null, int} d;
 }
 }

 Is this case included in SPARK-2446
 https://issues.apache.org/jira/browse/SPARK-2446?


 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com:

 This is not supported yet, but there is a PR open to fix it:
 https://issues.apache.org/jira/browse/SPARK-2446


 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark-sql 1.0.1 to load parquet files generated from method
 described in:

 https://gist.github.com/massie/7224868


 When I try to submit a select query with columns of type fixed length
 byte array, the following error pops up:


 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at
 basicOperators.scala:100
 org.apache.spark.SparkDriverExecutionException: Execution error
 at
 org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581)
 at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)
 Caused by: parquet.io.ParquetDecodingException: Can not read value at
 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
 at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at
 org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
 ... 1 more
 Caused by: java.lang.ClassCastException: Expected instance of
 primitive converter but got
 org.apache.spark.sql.parquet.CatalystNativeArrayConverter
 at
 parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30)
 at
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264)
 at
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
 at
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue

Spark SQL 1.0.1 error on reading fixed length byte array

2014-07-14 Thread Pei-Lun Lee
Hi,

I am using spark-sql 1.0.1 to load parquet files generated from method
described in:

https://gist.github.com/massie/7224868


When I try to submit a select query with columns of type fixed length byte
array, the following error pops up:


14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at
basicOperators.scala:100
org.apache.spark.SparkDriverExecutionException: Execution error
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)
Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file s3n://foo/bar/part-r-0.snappy.parquet
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
... 1 more
Caused by: java.lang.ClassCastException: Expected instance of primitive
converter but got
org.apache.spark.sql.parquet.CatalystNativeArrayConverter
at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30)
at
parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264)
at
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
at
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
... 24 more


Is fixed length byte array supposed to work in this version? I noticed that
other array types like int or string already work.

Thanks,
--
Pei-Lun


Re: LiveListenerBus throws exception and weird web UI bug

2014-06-26 Thread Pei-Lun Lee
Hi Baoxu, thanks for sharing.


2014-06-26 22:51 GMT+08:00 Baoxu Shi(Dash) b...@nd.edu:

 Hi Pei-Lun,

 I have the same problem there. The Issue is SPARK-2228, there also someone
 posted a pull request on that, but he only eliminate this exception but not
 the side effects.

 I think the problem may due to the hard-coded   private val
 EVENT_QUEUE_CAPACITY = 1

 in core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala.
 There may have a chance that when the event_queue is full, the system start
 dropping events, and causing key not found because those events never been
 submitted.

 Don’t know if that can help.

 On Jun 26, 2014, at 6:41 AM, Pei-Lun Lee pl...@appier.com wrote:

 
  Hi,
 
  We have a long running spark application runs on spark 1.0 standalone
 server and after it runs several hours the following exception shows up:
 
 
  14/06/25 23:13:08 ERROR LiveListenerBus: Listener JobProgressListener
 threw an exception
  java.util.NoSuchElementException: key not found: 6375
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:58)
  at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
  at
 org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78)
  at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)
  at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)
  at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
  at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)
  at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)
  at
 org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48)
  at
 org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)
  at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
  at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
  at scala.Option.foreach(Option.scala:236)
  at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)
  at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
  at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
  at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
  at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)
 
 
  And then the web UI (driver:4040) starts showing weird results like:
 (see attached screenshots)
  1. negative active tasks number
  2. complete stages still in active section or showing tasks incomplete
  3. unpersisted rdd still in storage page and having fraction cached 
 100%
 
  Eventually the application crashed but this is usually the first
 exception shows up.
  Any idea how to fix it?
 
  --
  Pei-Lun Lee
 
 
  Screen Shot 2014-06-26 at 12.52.38 PM.pngScreen Shot 2014-06-26 at
 12.52.21 PM.pngScreen Shot 2014-06-26 at 12.52.07 PM.pngScreen Shot
 2014-06-26 at 12.51.15 PM.png




Re: Spark SQL incorrect result on GROUP BY query

2014-06-12 Thread Pei-Lun Lee
I reran with master and looks like it is fixed.



2014-06-12 1:26 GMT+08:00 Michael Armbrust mich...@databricks.com:

 I'd try rerunning with master.  It is likely you are running into
 SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994.

 Michael


 On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark 1.0.0 and found in spark sql some queries use GROUP BY
 give weird results.
 To reproduce, type the following commands in spark-shell connecting to a
 standalone server:

 case class Foo(k: String, v: Int)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++
 List.fill(300)(Foo(c, 3))
 sc.makeRDD(rows).registerAsTable(foo)
 sql(select k,count(*) from foo group by k).collect

 the result will be something random like:
 res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75],
 [c,270], [4,56], [1,1])

 and if I run the same query again, the new result will be correct:
 sql(select k,count(*) from foo group by k).collect
 res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

 Should I file a bug?

 --
 Pei-Lun Lee





Spark SQL incorrect result on GROUP BY query

2014-06-11 Thread Pei-Lun Lee
Hi,

I am using spark 1.0.0 and found in spark sql some queries use GROUP BY
give weird results.
To reproduce, type the following commands in spark-shell connecting to a
standalone server:

case class Foo(k: String, v: Int)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++
List.fill(300)(Foo(c, 3))
sc.makeRDD(rows).registerAsTable(foo)
sql(select k,count(*) from foo group by k).collect

the result will be something random like:
res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75],
[c,270], [4,56], [1,1])

and if I run the same query again, the new result will be correct:
sql(select k,count(*) from foo group by k).collect
res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

Should I file a bug?

--
Pei-Lun Lee