Re: SparkSQL overwrite parquet file does not generate _common_metadata
I'm using 1.0.4 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com wrote: Hm, which version of Hadoop are you using? Actually there should also be a _metadata file together with _common_metadata. I was using Hadoop 2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did observe cases where Spark behaves differently because of semantic differences of the same API in different Hadoop versions. Cheng On 3/27/15 11:33 AM, Pei-Lun Lee wrote: Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode. Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: SparkSQL overwrite parquet file does not generate _common_metadata
JIRA ticket created at: https://issues.apache.org/jira/browse/SPARK-6581 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 7:03 PM, Cheng Lian lian.cs@gmail.com wrote: Thanks for the information. Verified that the _common_metadata and _metadata file are missing in this case when using Hadoop 1.0.4. Would you mind to open a JIRA for this? Cheng On 3/27/15 2:40 PM, Pei-Lun Lee wrote: I'm using 1.0.4 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com wrote: Hm, which version of Hadoop are you using? Actually there should also be a _metadata file together with _common_metadata. I was using Hadoop 2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did observe cases where Spark behaves differently because of semantic differences of the same API in different Hadoop versions. Cheng On 3/27/15 11:33 AM, Pei-Lun Lee wrote: Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode. Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: SparkSQL overwrite parquet file does not generate _common_metadata
Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode. Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: Which OutputCommitter to use for S3?
I updated the PR for SPARK-6352 to be more like SPARK-3595. I added a new setting spark.sql.parquet.output.committer.class in hadoop configuration to allow custom implementation of ParquetOutputCommitter. Can someone take a look at the PR? On Mon, Mar 16, 2015 at 5:23 PM, Pei-Lun Lee pl...@appier.com wrote: Hi, I created a JIRA and PR for supporting a s3 friendly output committer for saveAsParquetFile: https://issues.apache.org/jira/browse/SPARK-6352 https://github.com/apache/spark/pull/5042 My approach is add a DirectParquetOutputCommitter class in spark-sql package and use a boolean config variable spark.sql.parquet.useDirectParquetOutputCommitter to choose between default output committer. This may not be the smartest solution but it works for me. Tested on spark 1.1, 1.3 with hadoop 1.0.4. On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson ilike...@gmail.com wrote: Yes, unfortunately that direct dependency makes this injection much more difficult for saveAsParquetFile. On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote: Thanks for the DirectOutputCommitter example. However I found it only works for saveAsHadoopFile. What about saveAsParquetFile? It looks like SparkSQL is using ParquetOutputCommitter, which is subclass of FileOutputCommitter. On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: FYI. We're currently addressing this at the Hadoop level in https://issues.apache.org/jira/browse/HADOOP-9565 Thomas Demoor On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin
SparkSQL overwrite parquet file does not generate _common_metadata
Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: SparkSQL 1.3.0 JDBC data source issues
JIRA and PR for first issue: https://issues.apache.org/jira/browse/SPARK-6408 https://github.com/apache/spark/pull/5087 On Thu, Mar 19, 2015 at 12:20 PM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am trying jdbc data source in spark sql 1.3.0 and found some issues. First, the syntax where str_col='value' will give error for both postgresql and mysql: psql create table foo(id int primary key,name text,age int); bash SPARK_CLASSPATH=postgresql-9.4-1201-jdbc41.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:postgresql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect org.postgresql.util.PSQLException: ERROR: operator does not exist: text = bar Hint: No operator matches the given name and argument type(s). You might need to add explicit type casts. Position: 40 scala sql(select * from foo where name like '%foo').collect bash SPARK_CLASSPATH=mysql-connector-java-5.1.34.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'bar' in 'where clause' Second, postgresql table with json data type does not work: psql create table foo(id int primary key, data json); scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) java.sql.SQLException: Unsupported type Not sure these are bug in spark sql or jdbc. I can file JIRA ticket if needed. Thanks, -- Pei-Lun
SparkSQL 1.3.0 JDBC data source issues
Hi, I am trying jdbc data source in spark sql 1.3.0 and found some issues. First, the syntax where str_col='value' will give error for both postgresql and mysql: psql create table foo(id int primary key,name text,age int); bash SPARK_CLASSPATH=postgresql-9.4-1201-jdbc41.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:postgresql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect org.postgresql.util.PSQLException: ERROR: operator does not exist: text = bar Hint: No operator matches the given name and argument type(s). You might need to add explicit type casts. Position: 40 scala sql(select * from foo where name like '%foo').collect bash SPARK_CLASSPATH=mysql-connector-java-5.1.34.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'bar' in 'where clause' Second, postgresql table with json data type does not work: psql create table foo(id int primary key, data json); scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) java.sql.SQLException: Unsupported type Not sure these are bug in spark sql or jdbc. I can file JIRA ticket if needed. Thanks, -- Pei-Lun
Re: Which OutputCommitter to use for S3?
Hi, I created a JIRA and PR for supporting a s3 friendly output committer for saveAsParquetFile: https://issues.apache.org/jira/browse/SPARK-6352 https://github.com/apache/spark/pull/5042 My approach is add a DirectParquetOutputCommitter class in spark-sql package and use a boolean config variable spark.sql.parquet.useDirectParquetOutputCommitter to choose between default output committer. This may not be the smartest solution but it works for me. Tested on spark 1.1, 1.3 with hadoop 1.0.4. On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson ilike...@gmail.com wrote: Yes, unfortunately that direct dependency makes this injection much more difficult for saveAsParquetFile. On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote: Thanks for the DirectOutputCommitter example. However I found it only works for saveAsHadoopFile. What about saveAsParquetFile? It looks like SparkSQL is using ParquetOutputCommitter, which is subclass of FileOutputCommitter. On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: FYI. We're currently addressing this at the Hadoop level in https://issues.apache.org/jira/browse/HADOOP-9565 Thomas Demoor On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com ; user@spark.apache.org user@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM
Re: Which OutputCommitter to use for S3?
Thanks for the DirectOutputCommitter example. However I found it only works for saveAsHadoopFile. What about saveAsParquetFile? It looks like SparkSQL is using ParquetOutputCommitter, which is subclass of FileOutputCommitter. On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: FYI. We're currently addressing this at the Hadoop level in https://issues.apache.org/jira/browse/HADOOP-9565 Thomas Demoor On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; user@spark.apache.org user@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM Subject: Re: Which OutputCommitter to use for S3? Here is the class: https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e= You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider
Re: Spark SQL - custom aggregation function (UDAF)
I created https://issues.apache.org/jira/browse/SPARK-3947 On Tue, Oct 14, 2014 at 3:54 AM, Michael Armbrust mich...@databricks.com wrote: Its not on the roadmap for 1.2. I'd suggest opening a JIRA. On Mon, Oct 13, 2014 at 4:28 AM, Pierre B pierre.borckm...@realimpactanalytics.com wrote: Is it planned in a near future ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-custom-aggregation-function-UDAF-tp15784p16275.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql union all is slow
Hi, You can merge them into one table by: sqlContext.unionAll(sqlContext.unionAll(sqlContext.table(table_1), sqlContext.table(table_2)), sqlContext.table(table_3)).registarTempTable(table_all) Or load them in one call by: sqlContext.parquetFile(table_1.parquet,table_2.parquet,table_3.parquet).registerTempTable(table_all) On Wed, Oct 15, 2014 at 2:51 AM, shuluster s...@turn.com wrote: I have many tables of same schema, they are partitioned by time. For example one id could be in many of those table. I would like to find aggregation of such ids. Originally these tables are located on HDFS as files. Once table schemaRDD is loaded, I cacheTable on them. Each table is around 30m - 100m serialized data The SQL I composed looks like the following: Select id, sum(cost) as cost from ( (((select id, sum(cost) as cost from table_1 where id = 1 group by id ) union all (select id, sum(cost) as cost from table_2 where id = 1 group by id )) union all (select id, sum(cost) as cost from table_3 where id = 1 group by id )) as temp_table group by id The call to sparkSqlContext.sql() takes a long time to return a schemaRDD, the execution of collect of this RDD was not too slow. IS there something I am doing wrong here? Or Any tips on how to debug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-union-all-is-slow-tp16407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql left join gives KryoException: Buffer overflow
Hi Michael, Thanks for the suggestion. In my query, both table are too large to use broadcast join. When SPARK-2211 is done, will spark sql automatically choose join algorithms? Is there some way to manually hint the optimizer? 2014-07-19 5:23 GMT+08:00 Michael Armbrust mich...@databricks.com: Unfortunately, this is a query where we just don't have an efficiently implementation yet. You might try switching the table order. Here is the JIRA for doing something more efficient: https://issues.apache.org/jira/browse/SPARK-2212 On Fri, Jul 18, 2014 at 7:05 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, We have a query with left joining and got this error: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Looks like spark sql tried to do a broadcast join and collecting one of the table to master but it is too large. How do we explicitly control the join behavior like this? -- Pei-Lun Lee
spark sql left join gives KryoException: Buffer overflow
Hi, We have a query with left joining and got this error: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Looks like spark sql tried to do a broadcast join and collecting one of the table to master but it is too large. How do we explicitly control the join behavior like this? -- Pei-Lun Lee
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array supposed to work in this version? I noticed that other array types like int or string already work. Thanks, -- Pei-Lun
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Filed SPARK-2446 2014-07-15 16:17 GMT+08:00 Michael Armbrust mich...@databricks.com: Oh, maybe not. Please file another JIRA. On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote: Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array
Re: Spark SQL 1.0.1 error on reading fixed length byte array
Sorry, should be SPARK-2489 2014-07-15 19:22 GMT+08:00 Pei-Lun Lee pl...@appier.com: Filed SPARK-2446 2014-07-15 16:17 GMT+08:00 Michael Armbrust mich...@databricks.com: Oh, maybe not. Please file another JIRA. On Tue, Jul 15, 2014 at 12:34 AM, Pei-Lun Lee pl...@appier.com wrote: Hi Michael, Good to know it is being handled. I tried master branch (9fe693b5) and got another error: scala sqlContext.parquetFile(/tmp/foo) java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(4) b at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58) at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279) .. The avro schema I used is something like: protocol Test { fixed Bytes4(4); record User { string name; int age; union {null, int} i; union {null, int} j; union {null, Bytes4} b; union {null, bytes} c; union {null, int} d; } } Is this case included in SPARK-2446 https://issues.apache.org/jira/browse/SPARK-2446? 2014-07-15 3:54 GMT+08:00 Michael Armbrust mich...@databricks.com: This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue
Spark SQL 1.0.1 error on reading fixed length byte array
Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array supposed to work in this version? I noticed that other array types like int or string already work. Thanks, -- Pei-Lun
Re: LiveListenerBus throws exception and weird web UI bug
Hi Baoxu, thanks for sharing. 2014-06-26 22:51 GMT+08:00 Baoxu Shi(Dash) b...@nd.edu: Hi Pei-Lun, I have the same problem there. The Issue is SPARK-2228, there also someone posted a pull request on that, but he only eliminate this exception but not the side effects. I think the problem may due to the hard-coded private val EVENT_QUEUE_CAPACITY = 1 in core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala. There may have a chance that when the event_queue is full, the system start dropping events, and causing key not found because those events never been submitted. Don’t know if that can help. On Jun 26, 2014, at 6:41 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, We have a long running spark application runs on spark 1.0 standalone server and after it runs several hours the following exception shows up: 14/06/25 23:13:08 ERROR LiveListenerBus: Listener JobProgressListener threw an exception java.util.NoSuchElementException: key not found: 6375 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) And then the web UI (driver:4040) starts showing weird results like: (see attached screenshots) 1. negative active tasks number 2. complete stages still in active section or showing tasks incomplete 3. unpersisted rdd still in storage page and having fraction cached 100% Eventually the application crashed but this is usually the first exception shows up. Any idea how to fix it? -- Pei-Lun Lee Screen Shot 2014-06-26 at 12.52.38 PM.pngScreen Shot 2014-06-26 at 12.52.21 PM.pngScreen Shot 2014-06-26 at 12.52.07 PM.pngScreen Shot 2014-06-26 at 12.51.15 PM.png
Re: Spark SQL incorrect result on GROUP BY query
I reran with master and looks like it is fixed. 2014-06-12 1:26 GMT+08:00 Michael Armbrust mich...@databricks.com: I'd try rerunning with master. It is likely you are running into SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994. Michael On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee
Spark SQL incorrect result on GROUP BY query
Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee