Spark 1.4 - memory bloat in group by/aggregate???
Hi, - Spark 1.4 on a single node machine. Run spark-shell - Reading from Parquet file with bunch of text columns and couple of amounts in decimal(14,4). On disk size of of the file is 376M. It has ~100 million rows - rdd1 = sqlcontext.read.parquet - rdd1.cache - group_by_df = rdd1.groupBy(a).agg(sum(rdd1(amount1)),sum(rdd1(amount2))) - group_by_df.cache - group_by_df.count // Trigger action - Results in 725 rows - Run top on machine - In the spark UI, the storage shows base ParquetRDD size is 2.3GB (multiple of storage size 376M), the size of the group_by_df is 43.2 KB. This seems ok - However, the top command shows the process memory RES part jumping from 2g at start to 31g after the count. This seems excessive for one group by operator and will lead to trouble for repeated similar operations on the data ... Any thoughts ? Thanks,
Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2
Env - Spark 1.3 Hadoop 2.3, Kerbeos xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 in same environment val codec = classOf[some codec class] val a = sc.textFile(/some_hdfs_file) a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace in Spark 1.3, works in Spark 1.2 in same env 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) some codec calls at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
park-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class
With Spark 1.3 xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 Config is CDH 5.3.0 (Hadoop 2.3) with Kerberos 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor node1078.svc.devpg.pdx.wd: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, node1080.svc.devpg.pdx.wd): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) at com.workday.mrcodec.CryptoAESHelper.setupCrypto(CryptoAESHelper.java:61) at com.workday.mrcodec.CryptoAESHelper.init(CryptoAESHelper.java:48) at com.workday.mrcodec.CryptoAESCompressor.init(CryptoAESCompressor.java:48) at com.workday.mrcodec.CryptoAESCodec.createCompressor(CryptoAESCodec.java:52) at com.workday.mrcodec.CryptoAESCodec.createOutputStream(CryptoAESCodec.java:148) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/hadoop/disk7/yarn/local/usercache/dev.baseline/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: How to specify the port for AM Actor ...
Filed https://issues.apache.org/jira/browse/SPARK-6653 On Sun, Mar 29, 2015 at 8:18 PM, Shixiong Zhu zsxw...@gmail.com wrote: LGTM. Could you open a JIRA and send a PR? Thanks. Best Regards, Shixiong Zhu 2015-03-28 7:14 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: I looked @ the 1.3.0 code and figured where this can be added In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, 0, conf = sparkConf, securityManager = securityMgr)._1 If I change it to below, then I can start it on the port I want. val port = sparkConf.getInt(spark.am.actor.port, 0) // New property ... actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, port, conf = sparkConf, securityManager = securityMgr)._1 Thoughts? Any other place where any change is needed? On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote: There is no configuration for it now. Best Regards, Shixiong Zhu 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: There may be firewall rules limiting the ports between host running spark and the hadoop cluster. In that case, not all ports are allowed. Can it be a range of ports that can be specified ? On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote: It's a random port to avoid port conflicts, since multiple AMs can run in the same machine. Why do you need a fixed port? Best Regards, Shixiong Zhu 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: Spark 1.3, Hadoop 2.5, Kerbeors When running spark-shell in yarn client mode, it shows following message with a random port every time (44071 in example below). Is there a way to specify that port to a specific port ? It does not seem to be part of ports specified in http://spark.apache.org/docs/latest/configuration.html spark.xxx.port ... Thanks, 15/03/25 22:27:10 INFO Client: Application report for application_1427316153428_0014 (state: ACCEPTED) 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@xyz :44071/user/YarnAM#-1989273896]
Spark 1.3 Source - Github and source tar does not seem to match
While looking into a issue, I noticed that the source displayed on Github site does not matches the downloaded tar for 1.3 Thoughts ?
Re: How to specify the port for AM Actor ...
I looked @ the 1.3.0 code and figured where this can be added In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, 0, conf = sparkConf, securityManager = securityMgr)._1 If I change it to below, then I can start it on the port I want. val port = sparkConf.getInt(spark.am.actor.port, 0) // New property ... actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, port, conf = sparkConf, securityManager = securityMgr)._1 Thoughts? Any other place where any change is needed? On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote: There is no configuration for it now. Best Regards, Shixiong Zhu 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: There may be firewall rules limiting the ports between host running spark and the hadoop cluster. In that case, not all ports are allowed. Can it be a range of ports that can be specified ? On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote: It's a random port to avoid port conflicts, since multiple AMs can run in the same machine. Why do you need a fixed port? Best Regards, Shixiong Zhu 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: Spark 1.3, Hadoop 2.5, Kerbeors When running spark-shell in yarn client mode, it shows following message with a random port every time (44071 in example below). Is there a way to specify that port to a specific port ? It does not seem to be part of ports specified in http://spark.apache.org/docs/latest/configuration.html spark.xxx.port ... Thanks, 15/03/25 22:27:10 INFO Client: Application report for application_1427316153428_0014 (state: ACCEPTED) 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@xyz :44071/user/YarnAM#-1989273896]
How to specify the port for AM Actor ...
Spark 1.3, Hadoop 2.5, Kerbeors When running spark-shell in yarn client mode, it shows following message with a random port every time (44071 in example below). Is there a way to specify that port to a specific port ? It does not seem to be part of ports specified in http://spark.apache.org/docs/latest/configuration.html spark.xxx.port ... Thanks, 15/03/25 22:27:10 INFO Client: Application report for application_1427316153428_0014 (state: ACCEPTED) 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@xyz :44071/user/YarnAM#-1989273896]
Re: How to specify the port for AM Actor ...
There may be firewall rules limiting the ports between host running spark and the hadoop cluster. In that case, not all ports are allowed. Can it be a range of ports that can be specified ? On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote: It's a random port to avoid port conflicts, since multiple AMs can run in the same machine. Why do you need a fixed port? Best Regards, Shixiong Zhu 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: Spark 1.3, Hadoop 2.5, Kerbeors When running spark-shell in yarn client mode, it shows following message with a random port every time (44071 in example below). Is there a way to specify that port to a specific port ? It does not seem to be part of ports specified in http://spark.apache.org/docs/latest/configuration.html spark.xxx.port ... Thanks, 15/03/25 22:27:10 INFO Client: Application report for application_1427316153428_0014 (state: ACCEPTED) 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@xyz :44071/user/YarnAM#-1989273896]
Re: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04
Thanks Marcelo - I was using the SBT built spark per earlier thread. I switched now to the distro (with the conf changes for CDH path in front) and guava issue is gone. Thanks, On Tue, Mar 24, 2015 at 1:50 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi there, On Tue, Mar 24, 2015 at 1:40 PM, Manoj Samel manojsamelt...@gmail.com wrote: When I run any query, it gives java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; Are you running a custom-compiled Spark by any chance? Specifically, one you built with sbt? That would hit this problem, because the path I suggested (/usr/lib/hadoop/client/*) contains an older guava library, which would override the one shipped with the sbt-built Spark. If you build Spark with maven, or use the pre-built Spark distro, or specifically filter out the guava jar from your classpath when setting up the Spark job, things should work. -- Marcelo -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout.
Hadoop 2.5 not listed in Spark 1.4 build page
http://spark.apache.org/docs/latest/building-spark.html#packaging-without-hadoop-dependencies-for-yarn does not list hadoop 2.5 in Hadoop version table table etc. I assume it is still OK to compile with -Pyarn -Phadoop-2.5 for use with Hadoop 2.5 (cdh 5.3.2) Thanks,
Re: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04
Thanks All - perhaps I misread the earlier posts as dependencies with Hadoop version, but the key is also the CDH 5.3.2 (not just Hadoop 2.5 v/s 2.4) etc. After adding the classPath as Marcelo/Harsh suggested (loading CDH libs front), I am able to get spark-shell started without invalid container etc so that issue is solved. When I run any query, it gives java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; This seems to be Guava lib version issue that has been known ... I will look into it. Thanks again ! On Tue, Mar 24, 2015 at 12:50 PM, Harsh J ha...@cloudera.com wrote: My comment's still the same: Runtime-link-via-classpath Spark to use CDH 5.3.2 libraries, just like your cluster does, not Apache Hadoop 2.5.0 (which CDH is merely based on, but carries several backports on top that aren't in Apache Hadoop 2.5.0, one of which addresses this parsing trouble). You do not require to recompile Spark, just alter its hadoop libraries in its classpath to be that of CDH server version (overwrite from parcels, etc.). On Wed, Mar 25, 2015 at 1:06 AM, Manoj Samel manojsamelt...@gmail.com wrote: I recompiled Spark 1.3 with Hadoop 2.5; it still gives same stack trace. A quick browse into stacktrace with Hadoop 2.5.0 org.apache.hadoop.yarn.util.ConverterUtils ... 1. toContainerId gets parameter containerId which I assume is container_ *e*06_1427223073530_0001_01_01 2. It splits it using public static final Splitter _SPLITTER = Splitter.on('_').trimResults(); 3. Line 172 checks container prefix with CONTAINER_PREFIX which is valid (container) 4. It calls toApplicationAttemptId 5. toApplicationAttemptId tries Long.parseLong(it.next()) on e06 and dies Seems like it is not expecting a non-numeric character. Is this a Yarn issue ? Thanks, On Tue, Mar 24, 2015 at 8:25 AM, Manoj Samel manoj.sa...@gmail.com wrote: I'll compile Spark with Hadoop libraries and try again ... Thanks, Manoj On Mar 23, 2015, at 10:34 PM, Harsh J ha...@cloudera.com wrote: This may happen if you are using different versions of CDH5 jars between Spark and the cluster. Can you ensure your Spark's Hadoop CDH jars match the cluster version exactly, since you seem to be using a custom version of Spark (out of CDH) here? On Tue, Mar 24, 2015 at 7:32 AM, Manoj Samel manojsamelt...@gmail.com wrote: x-post to CDH list for any insight ... Thanks, -- Forwarded message -- From: Manoj Samel manojsamelt...@gmail.com Date: Mon, Mar 23, 2015 at 6:32 PM Subject: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04 To: user@spark.apache.org user@spark.apache.org Spark 1.3, CDH 5.3.2, Kerberos Setup works fine with base configuration, spark-shell can be used in yarn client mode etc. When work recovery feature is enabled via http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_ha_yarn_work_preserving_recovery.html, the spark-shell fails with following log 15/03/24 01:20:16 ERROR yarn.ApplicationMaster: Uncaught exception: java.lang.IllegalArgumentException: Invalid ContainerId: container_e04_1427159778706_0002_01_01 at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182) at org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:93) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:83) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:576) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:574) at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:597) at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala) Caused by: java.lang.NumberFormatException: For input string: e04 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137) at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177) ... 12 more 15/03/24 01:20:16 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 10, (reason
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Log shows stack traces that seem to match the assert in JIRA so it seems I am hitting the issue. Thanks for the heads up ... 15/03/23 20:29:50 ERROR actor.OneForOneStrategy: assertion failed: Allocator killed more executors than are allocated! java.lang.AssertionError: assertion failed: Allocator killed more executors than are allocated! at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.deploy.yarn.YarnAllocator.killExecutor(YarnAllocator.scala:152) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1.applyOrElse(ApplicationMaster.scala:547) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor.aroundReceive(ApplicationMaster.scala:506) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) On Mon, Mar 23, 2015 at 2:25 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel manojsamelt...@gmail.com wrote: Found the issue above error - the setting for spark_shuffle was incomplete. Now it is able to ask and get additional executors. The issue is once they are released, it is not able to proceed with next query. That looks like SPARK-6325, which unfortunately was not fixed in time for 1.3.0... -- Marcelo
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at scala.collection.mutable.MapLike$class.retain(MapLike.scala:212) at scala.collection.mutable.AbstractMap.retain(Map.scala:91) at org.apache.spark.ExecutorAllocationManager.org $apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:234) at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:191) The next query hangs with following output .. 15/03/23 20:57:27 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.5 KB, free 264.9 MB) 15/03/23 20:57:27 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on xxx:45903 (size: 4.5 KB, free: 265.1 MB) 15/03/23 20:57:27 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0 15/03/23 20:57:27 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839 15/03/23 20:57:27 INFO DAGScheduler: Submitting 10 missing tasks from Stage 2 (MapPartitionsRDD[11] at mapPartitions at Exchange.scala:64) 15/03/23 20:57:27 INFO YarnScheduler: Adding task set 2.0 with 10 tasks 15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(2, yyy, 37231) with no recent heart beats: 131006ms exceeds 12ms 15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, zzz, 34437) with no recent heart beats: 131329ms exceeds 12ms 15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(1, bbb, 52747) with no recent heart beats: 128199ms exceeds 12ms 15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(2, yyy, 37231) 15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(1, bbb, 52747) 15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(5, zzz, 34437) On Sat, Mar 21, 2015 at 6:51 AM, Ted Yu yuzhih...@gmail.com wrote: bq. Requesting 1 new executor(s) because tasks are backlogged 1 executor was requested. Which hadoop release are you using ? Can you check resource manager log to see if there is some clue ? Thanks On Fri, Mar 20, 2015 at 4:17 PM, Manoj Samel manojsamelt...@gmail.com wrote: Forgot to add - the cluster is idle otherwise so there should be no resource issues. Also the configuration works when not using Dynamic allocation. On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Running Spark 1.3 with secured Hadoop. Spark-shell with Yarn client mode runs without issue when not using Dynamic Allocation. When Dynamic allocation is turned on, the shell comes up but same SQL etc. causes it to loop. spark.dynamicAllocation.enabled=true spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=10 # Set IdleTime low for testing spark.dynamicAllocation.executorIdleTimeout=60 spark.shuffle.service.enabled=true Following is the start of the messages and then it keeps looping with Requesting 0 new executors 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100) 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1
Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04
Spark 1.3, CDH 5.3.2, Kerberos Setup works fine with base configuration, spark-shell can be used in yarn client mode etc. When work recovery feature is enabled via http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_ha_yarn_work_preserving_recovery.html, the spark-shell fails with following log 15/03/24 01:20:16 ERROR yarn.ApplicationMaster: Uncaught exception: java.lang.IllegalArgumentException: Invalid ContainerId: container_e04_1427159778706_0002_01_01 at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182) at org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:93) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:83) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:576) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:574) at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:597) at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala) Caused by: java.lang.NumberFormatException: For input string: e04 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137) at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177) ... 12 more 15/03/24 01:20:16 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 10, (reason: Uncaught exception: Invalid ContainerId: container_e04_1427159778706_0002_01_01)
Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Hi, Running Spark 1.3 with secured Hadoop. Spark-shell with Yarn client mode runs without issue when not using Dynamic Allocation. When Dynamic allocation is turned on, the shell comes up but same SQL etc. causes it to loop. spark.dynamicAllocation.enabled=true spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=10 # Set IdleTime low for testing spark.dynamicAllocation.executorIdleTimeout=60 spark.shuffle.service.enabled=true Following is the start of the messages and then it keeps looping with Requesting 0 new executors 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100) 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1)
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Forgot to add - the cluster is idle otherwise so there should be no resource issues. Also the configuration works when not using Dynamic allocation. On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Running Spark 1.3 with secured Hadoop. Spark-shell with Yarn client mode runs without issue when not using Dynamic Allocation. When Dynamic allocation is turned on, the shell comes up but same SQL etc. causes it to loop. spark.dynamicAllocation.enabled=true spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=10 # Set IdleTime low for testing spark.dynamicAllocation.executorIdleTimeout=60 spark.shuffle.service.enabled=true Following is the start of the messages and then it keeps looping with Requesting 0 new executors 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100) 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1)
Dataframe v/s SparkSQL
Is it correct to say that Spark Dataframe APIs are implemented using same execution as SparkSQL ? In other words, while the dataframe API is different than SparkSQL, the runtime performance of equivalent constructs in Dataframe and SparkSQL should be same. So one should be able to choose whichever of the two (DF v/s SQL) suite the use cases and not worry about runtime performance. Pl comment ... Thanks,
New ColumnType For Decimal Caching
Thanks Michael for the pointer Sorry for the delayed reply. Taking a quick inventory of scope of change - Is the column type for Decimal caching needed only in the caching layer (4 files in org.apache.spark.sql.columnar - ColumnAccessor.scala, ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala) Or do other SQL components also need to be touched ? Hoping for a quick feedback of top of your head ... Thanks, On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com wrote: You could add a new ColumnType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala . PRs welcome :) On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Is there a separate mailing list for Spark Developers ?
d...@spark.apache.org http://apache-spark-developers-list.1001551.n3.nabble.com/ mentioned on http://spark.apache.org/community.html seems to be bouncing. Is there another one ?
Re: SQL group by on Parquet table slower when table cached
Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
SQL group by on Parquet table slower when table cached
Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Large # of tasks in groupby on single table
Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
Re: Large # of tasks in groupby on single table
Follow up for closure on thread ... 1. spark.sql.shuffle.partitions is not on config page but is mentioned on http://spark.apache.org/docs/1.2.0/sql-programming-guide.html. Would be better to have it in config page as well for sake of completeness. Should I file a doc bug ? 2. Regarding my #2 above (Spark should auto-determining # of tasks), there is already a write up on SQL Programming page in Hive optimizations not in Spark Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”. Any idea if and when this is scheduled ? Even a rudimentary implementation (e.g. based on # of partitions of underlying RDD, which is available now) would be a improvement over current fixed 200 and would be a critical feature for SparkSQL feasibility Thanks On Wed, Feb 4, 2015 at 4:09 PM, Manoj Samel manojsamelt...@gmail.com wrote: Awesome ! By setting this, I could minimize the collect overhead, e.g by setting it to # of partitions of the RDD. Two questions 1. I had looked for such option in http://spark.apache.org/docs/latest/configuration.html but this is not documented. Seems this a doc. bug ? 2. Ideally the shuffle partitions should be derive from underlying table(s) and a optimal number should be set for each query. Having one number across all queries is not ideal, nor do the consumer wants to set it before each query to different #. Any thoughts ? Thanks ! On Wed, Feb 4, 2015 at 3:50 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Manoj, You can set the number of partitions you want your sql query to use. By default it is 200 and thus you see that number. You can update it using the spark.sql.shuffle.partitions property spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations. Thanks Ankur On Wed, Feb 4, 2015 at 3:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Any thoughts on this? Most of the 200 tasks in collect phase take less than 20 ms and lot of time is spent on scheduling these. I suspect the overall time will reduce if # of tasks are dropped to a much smaller # (close to # of partitions?) Any help is appreciated Thanks On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
Re: Large # of tasks in groupby on single table
Hi, Any thoughts on this? Most of the 200 tasks in collect phase take less than 20 ms and lot of time is spent on scheduling these. I suspect the overall time will reduce if # of tasks are dropped to a much smaller # (close to # of partitions?) Any help is appreciated Thanks On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data is read from parquet with 2 partitions and is cached as table with 2 partitions. Verified in UI that it shows RDD with 2 partitions it is fully cached in memory Cached data contains column a, b, c. Column a has ~150 distinct values. Next run SQL on this table as select a, sum(b), sum(c) from table x The query creates 200 tasks. Further, the advanced metric scheduler delay is significant % for most of these tasks. This seems very high overhead for query on RDD with 2 partitions It seems if this is run with less number of task, the query should run faster ? Any thoughts on how to control # of partitions for the group by (or other SQLs) ? Thanks,
Re: Error in saving schemaRDD with Decimal as Parquet
Hi, Any thoughts ? Thanks, On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 SchemaRDD has schema with decimal columns created like x1 = new StructField(a, DecimalType(14,4), true) x2 = new StructField(b, DecimalType(14,4), true) Registering as SQL Temp table and doing SQL queries on these columns , including SUM etc. works fine, so the schema Decimal does not seems to be issue When doing saveAsParquetFile on the RDD, it gives following error. Not sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems to see it as scala.math.BigDecimal java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType( ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write( InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1( ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Error in saving schemaRDD with Decimal as Parquet
Spark 1.2 SchemaRDD has schema with decimal columns created like x1 = new StructField(a, DecimalType(14,4), true) x2 = new StructField(b, DecimalType(14,4), true) Registering as SQL Temp table and doing SQL queries on these columns , including SUM etc. works fine, so the schema Decimal does not seems to be issue When doing saveAsParquetFile on the RDD, it gives following error. Not sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems to see it as scala.math.BigDecimal java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType( ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write( InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1( ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Re: Error in saving schemaRDD with Decimal as Parquet
I think I found the issue causing it. I was calling schemaRDD.coalesce(n).saveAsParquetFile to reduce the number of partitions in parquet file - in which case the stack trace happens. If I compress the partitions before creating schemaRDD then the schemaRDD.saveAsParquetFile call works for decimal So it seems schemaRDD.coalesce returns a RDD whose schema does not matches the source RDD in that decimal type seem to get changed. Any thoughts ? Is this a bug ??? Thanks, On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 SchemaRDD has schema with decimal columns created like x1 = new StructField(a, DecimalType(14,4), true) x2 = new StructField(b, DecimalType(14,4), true) Registering as SQL Temp table and doing SQL queries on these columns , including SUM etc. works fine, so the schema Decimal does not seems to be issue When doing saveAsParquetFile on the RDD, it gives following error. Not sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems to see it as scala.math.BigDecimal java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType( ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write( InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1( ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Why is DecimalType separate from DataType ?
Spark 1.2 While building schemaRDD using StructType xxx = new StructField(credit_amount, DecimalType, true) gives error type mismatch; found : org.apache.spark.sql.catalyst.types.DecimalType.type required: org.apache.spark.sql.catalyst.types.DataType From https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.package, it seems DecimalType = sql.catalyst.types.DecimalType is separate from DataType = sql.catalyst.types.DataType Not sure why that is the case? How does one uses Decimal and other types in StructField? Thanks,
schemaRDD.saveAsParquetFile creates large number of small parquet files ...
Spark 1.2 on Hadoop 2.3 Read one big csv file, create a schemaRDD on it and saveAsParquetFile. It creates a large number of small (~1MB ) parquet part-x- files. Any way to control so that smaller number of large files are created ? Thanks,
SparkSQL Performance Tuning Options
Spark 1.2, no Hive, prefer not to use HiveContext to avoid metastore_db. Use case is Spark Yarn app will start and serve as query server for multiple users i.e. always up and running. At startup, there is option to cache data and also pre-compute some results sets, hash maps etc. that would be likely be asked by client APIs. I.e there is some option to use startup time to precompute/cache - but query response time requirement on large data set is very stringent Hoping to use SparkSQL (but a combination of SQL and RDD APIs is also OK). * Does SparkSQL execution uses underlying partition information ? (Data is from HDFS) * Are there any ways to give hints to the SparkSQL execution about any precomputed/pre-cached RDDs? * Packages spark.sql.execution, spark.sql.execution.joins and other sql.xxx packages - would using these for tuning query plan is recommended? Would like to keep this as-needed if possible * Features not in current release but scheduled for upcoming release would also be good to know. Thanks, PS: This is not a small topic so if someone prefers to start a offline thread on details, I can do that and summarize the conclusions back to this thread.
Re: spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType
Awesome ! That would be great !! On Mon, Jan 26, 2015 at 3:18 PM, Michael Armbrust mich...@databricks.com wrote: I'm aiming for 1.3. On Mon, Jan 26, 2015 at 3:05 PM, Manoj Samel manojsamelt...@gmail.com wrote: Thanks Michael. I am sure there have been many requests for this support. Any release targeted for this? Thanks, On Sat, Jan 24, 2015 at 11:47 AM, Michael Armbrust mich...@databricks.com wrote: Those annotations actually don't work because the timestamp is SQL has optional nano-second precision. However, there is a PR to add support using parquets INT96 type: https://github.com/apache/spark/pull/3820 On Fri, Jan 23, 2015 at 12:08 PM, Manoj Samel manojsamelt...@gmail.com wrote: Looking further at the trace and ParquetTypes.scala, it seems there is no support for Timestamp and Date in fromPrimitiveDataType(ctype: DataType): Option[ParquetTypeInfo]. Since Parquet supports these type with some decoration over Int ( https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md), any reason why Date / Timestamp are not supported right now ? Thanks, Manoj On Fri, Jan 23, 2015 at 11:40 AM, Manoj Samel manojsamelt...@gmail.com wrote: Using Spark 1.2 Read a CSV file, apply schema to convert to SchemaRDD and then schemaRdd.saveAsParquetFile If the schema includes Timestamptype, it gives following trace when doing the save Exception in thread main java.lang.RuntimeException: Unsupported datatype TimestampType at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:343) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType( ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:363) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:362) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map( TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes( ParquetTypes.scala:361) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData( ParquetTypes.scala:407) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty( ParquetRelation.scala:166) at org.apache.spark.sql.parquet.ParquetRelation$.create( ParquetRelation.scala:145) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply( SparkStrategies.scala:204) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply( QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute( SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan( SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute( SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd( SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile( SchemaRDDLike.scala:76) at org.apache.spark.sql.SchemaRDD.saveAsParquetFile( SchemaRDD.scala:108) at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88) at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp( AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach( TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at bdrt.MyTest$.main(MyTest.scala:10)
Re: spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType
Thanks Michael. I am sure there have been many requests for this support. Any release targeted for this? Thanks, On Sat, Jan 24, 2015 at 11:47 AM, Michael Armbrust mich...@databricks.com wrote: Those annotations actually don't work because the timestamp is SQL has optional nano-second precision. However, there is a PR to add support using parquets INT96 type: https://github.com/apache/spark/pull/3820 On Fri, Jan 23, 2015 at 12:08 PM, Manoj Samel manojsamelt...@gmail.com wrote: Looking further at the trace and ParquetTypes.scala, it seems there is no support for Timestamp and Date in fromPrimitiveDataType(ctype: DataType): Option[ParquetTypeInfo]. Since Parquet supports these type with some decoration over Int ( https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md), any reason why Date / Timestamp are not supported right now ? Thanks, Manoj On Fri, Jan 23, 2015 at 11:40 AM, Manoj Samel manojsamelt...@gmail.com wrote: Using Spark 1.2 Read a CSV file, apply schema to convert to SchemaRDD and then schemaRdd.saveAsParquetFile If the schema includes Timestamptype, it gives following trace when doing the save Exception in thread main java.lang.RuntimeException: Unsupported datatype TimestampType at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:343) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType( ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:363) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:362) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes( ParquetTypes.scala:361) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData( ParquetTypes.scala:407) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty( ParquetRelation.scala:166) at org.apache.spark.sql.parquet.ParquetRelation$.create( ParquetRelation.scala:145) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply( SparkStrategies.scala:204) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply( QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute( SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan( SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute( SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd( SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile( SchemaRDDLike.scala:76) at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:108) at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88) at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp( AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach( TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at bdrt.MyTest$.main(MyTest.scala:10)
spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType
Using Spark 1.2 Read a CSV file, apply schema to convert to SchemaRDD and then schemaRdd.saveAsParquetFile If the schema includes Timestamptype, it gives following trace when doing the save Exception in thread main java.lang.RuntimeException: Unsupported datatype TimestampType at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:343) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType( ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:363) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:362) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes( ParquetTypes.scala:361) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData( ParquetTypes.scala:407) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty( ParquetRelation.scala:166) at org.apache.spark.sql.parquet.ParquetRelation$.create( ParquetRelation.scala:145) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply( SparkStrategies.scala:204) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply( QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute( SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan( SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute( SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425 ) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile( SchemaRDDLike.scala:76) at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:108) at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88) at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach( TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at bdrt.MyTest$.main(MyTest.scala:10)
Error when running SparkPi on Secure HA Hadoop cluster
Hi, Setup is as follows Hadoop Cluster 2.3.0 (CDH5.0) - Namenode HA - Resource manager HA - Secured with Kerberos Spark 1.2 Run SparkPi as follows - conf/spark-defaults.conf has following entries spark.yarn.queue myqueue spark.yarn.access.namenodes hdfs://namespace (remember this is namenode HA) - Do kinit with some user keytab - submit SparkPi as follows spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue thequeue $MY_SPARK_DIR/lib/spark-examples*.jar 10 Gives following trace (not sure why it shows unknown queue when queue name is specified in the spark-defaults.conf above. 15/01/15 19:18:27 INFO impl.YarnClientImpl: Submitted application application_1415648563285_31469 15/01/15 19:18:28 INFO yarn.Client: Application report for application_1415648563285_31469 (state: FAILED) 15/01/15 19:18:28 INFO yarn.Client: client token: N/A diagnostics: Application application_1415648563285_31469 submitted by user XYZ to unknown queue: thequeue --- WHY UNKNOWN QUEUE ??? ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: thequeue --- WHY UNKNOWN QUEUE ??? start time: 1421349507652 final status: FAILED tracking URL: N/A user: XYZ Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)
Running spark 1.2 on Hadoop + Kerberos
Hi, For running spark 1.2 on Hadoop cluster with Kerberos, what spark configurations are required? Using existing keytab, can any examples be submitted to the secured cluster ? How? Thanks,
Re: Running spark 1.2 on Hadoop + Kerberos
Pl ignore the keytab question for now, the question wasn't fully described Some old communication (Oct 14) says Spark is not certified with Kerberos. Can someone comment on this aspect ? On Thu, Jan 8, 2015 at 3:53 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Manoj, As long as you're logged in (i.e. you've run kinit), everything should just work. You can run klist to make sure you're logged in. On Thu, Jan 8, 2015 at 3:49 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, For running spark 1.2 on Hadoop cluster with Kerberos, what spark configurations are required? Using existing keytab, can any examples be submitted to the secured cluster ? How? Thanks, -- Marcelo
Cannot see RDDs in Spark UI
Hi, I create a bunch of RDDs, including schema RDDs. When I run the program and go to UI on xxx:4040, the storage tab does not shows any RDDs. Spark version is 1.1.1 (Hadoop 2.3) Any thoughts? Thanks,
Sharing sqlContext between Akka router and routee actors ...
Hi, Akka router creates a sqlContext and creates a bunch of routees actors with sqlContext as parameter. The actors then execute query on that sqlContext. Would this pattern be a issue ? Any other way sparkContext etc. should be shared cleanly in Akka routers/routees ? Thanks,
Re: Spark Server - How to implement
Thanks Marcelo. Spark Gurus/Databricks team - do you have something in roadmap for such a spark server ? Thanks, On Thu, Dec 11, 2014 at 5:43 PM, Marcelo Vanzin van...@cloudera.com wrote: Oops, sorry, fat fingers. We've been playing with something like that inside Hive: https://github.com/apache/hive/tree/spark/spark-client That seems to have at least a few of the characteristics you're looking for; but it's a very young project, and at this moment we're not developing it as a public API, but mostly for internal Hive use. It can give you a few ideas, though. Also, SPARK-3215. On Thu, Dec 11, 2014 at 5:41 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Manoj, I'm not aware of any public projects that do something like that, except for the Ooyala server which you say doesn't cover your needs. We've been playing with something like that inside Hive, though: On Thu, Dec 11, 2014 at 5:33 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, If spark based services are to be exposed as a continuously available server, what are the options? * The API exposed to client will be proprietary and fine grained (RPC style ..), not a Job level API * The client API need not be SQL so the Thrift JDBC server does not seem to be option .. but I could be wrong here ... * Ooyala implementation is a REST API for job submission, but as mentioned above; the desired API is a finer grain API, not a job submission Any existing implementation? Is it build your own server? Any thoughts on approach to use ? Thanks, -- Marcelo -- Marcelo
Spark Server - How to implement
Hi, If spark based services are to be exposed as a continuously available server, what are the options? * The API exposed to client will be proprietary and fine grained (RPC style ..), not a Job level API * The client API need not be SQL so the Thrift JDBC server does not seem to be option .. but I could be wrong here ... * Ooyala implementation is a REST API for job submission, but as mentioned above; the desired API is a finer grain API, not a job submission Any existing implementation? Is it build your own server? Any thoughts on approach to use ? Thanks,
Spark 1.1.1 SQLContext.jsonFile dumps trace if JSON has newlines ...
I am using SQLContext.jsonFile. If a valid JSON contains newlines, spark1.1.1 dumps trace below. If the JSON is read as one line, it works fine. Is this known? 14/12/10 11:44:02 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 28) com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries at [Source: java.io.StringReader@4c8dd4d9; line: 1, column: 19] at com.fasterxml.jackson.core.JsonParser._constructError( JsonParser.java:1524) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWS( ReaderBasedJsonParser.java:1682) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken( ReaderBasedJsonParser.java:619) at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringMap( MapDeserializer.java:412) at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize( MapDeserializer.java:312) at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize( MapDeserializer.java:26) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose( ObjectMapper.java:2986) at com.fasterxml.jackson.databind.ObjectMapper.readValue( ObjectMapper.java:2091) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply( JsonRDD.scala:275) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply( JsonRDD.scala:274) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft( TraversableOnce.scala:172) at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) at org.apache.spark.SparkContext$$anonfun$28.apply(SparkContext.scala:1179) at org.apache.spark.SparkContext$$anonfun$28.apply(SparkContext.scala:1179) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/12/10 11:44:02 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 28, localhost): com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries at [Source: java.io.StringReader@4c8dd4d9; line: 1, column: 19] com.fasterxml.jackson.core.JsonParser._constructError( JsonParser.java:1524) com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWS( ReaderBasedJsonParser.java:1682) com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken( ReaderBasedJsonParser.java:619) com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringMap( MapDeserializer.java:412) com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize( MapDeserializer.java:312) com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize( MapDeserializer.java:26) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose( ObjectMapper.java:2986) com.fasterxml.jackson.databind.ObjectMapper.readValue( ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply( JsonRDD.scala:275) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply( JsonRDD.scala:274) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.reduceLeft( TraversableOnce.scala:172) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847) org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845) org.apache.spark.SparkContext$$anonfun$28.apply( SparkContext.scala:1179) org.apache.spark.SparkContext$$anonfun$28.apply( SparkContext.scala:1179) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178 ) java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) 14/12/10 11:44:02 ERROR TaskSetManager: Task 0 in stage 14.0 failed 1 times; aborting job
Can HiveContext be used without using Hive?
From 1.1.1 documentation, it seems one can use HiveContext instead of SQLContext without having a Hive installation. The benefit is richer SQL dialect. Is my understanding correct ? Thanks
Spark SQL - Any time line to move beyond Alpha version ?
Is there any timeline where Spark SQL goes beyond alpha version? Thanks,
Re: Spark resilience
Thanks Aaron, this is useful ! - Manoj On Mon, Apr 14, 2014 at 8:12 PM, Aaron Davidson ilike...@gmail.com wrote: Launching drivers inside the cluster was a feature added in 0.9, for standalone cluster mode: http://spark.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster Note the supervise flag, which will cause the driver to be restarted if it fails. This is a rather low-level mechanism which by default will just cause the whole job to rerun from the beginning. Special recovery would have to be implemented by hand, via some sort of state checkpointing into a globally visible storage system (e.g., HDFS), which, for example, Spark Streaming already does. Currently, this feature is not supported in YARN or Mesos fine-grained mode. On Mon, Apr 14, 2014 at 2:08 PM, Manoj Samel manojsamelt...@gmail.comwrote: Could you please elaborate how drivers can be restarted automatically ? Thanks, On Mon, Apr 14, 2014 at 10:30 AM, Aaron Davidson ilike...@gmail.comwrote: Master and slave are somewhat overloaded terms in the Spark ecosystem (see the glossary: http://spark.apache.org/docs/latest/cluster-overview.html#glossary). Are you actually asking about the Spark driver and executors, or the standalone cluster master and workers? To briefly answer for either possibility: (1) Drivers are not fault tolerant but can be restarted automatically, Executors may be removed at any point without failing the job (though losing an Executor may slow the job significantly), and Executors may be added at any point and will be immediately used. (2) Standalone cluster Masters are fault tolerant and failure will only temporarily stall new jobs from starting or getting new resources, but does not affect currently-running jobs. Workers can fail and will simply cause jobs to lose their current Executors. New Workers can be added at any point. On Mon, Apr 14, 2014 at 11:00 AM, Ian Ferreira ianferre...@hotmail.comwrote: Folks, I was wondering what the failure support modes where for Spark while running jobs 1. What happens when a master fails 2. What happens when a slave fails 3. Can you mid job add and remove slaves Regarding the install on Meso, if I understand correctly the Spark master is behind a Zookeeper quorum so that isolates the slaves from a master failure, but what about the masters behind quorum? Cheers - Ian
Re: Spark resilience
Could you please elaborate how drivers can be restarted automatically ? Thanks, On Mon, Apr 14, 2014 at 10:30 AM, Aaron Davidson ilike...@gmail.com wrote: Master and slave are somewhat overloaded terms in the Spark ecosystem (see the glossary: http://spark.apache.org/docs/latest/cluster-overview.html#glossary). Are you actually asking about the Spark driver and executors, or the standalone cluster master and workers? To briefly answer for either possibility: (1) Drivers are not fault tolerant but can be restarted automatically, Executors may be removed at any point without failing the job (though losing an Executor may slow the job significantly), and Executors may be added at any point and will be immediately used. (2) Standalone cluster Masters are fault tolerant and failure will only temporarily stall new jobs from starting or getting new resources, but does not affect currently-running jobs. Workers can fail and will simply cause jobs to lose their current Executors. New Workers can be added at any point. On Mon, Apr 14, 2014 at 11:00 AM, Ian Ferreira ianferre...@hotmail.comwrote: Folks, I was wondering what the failure support modes where for Spark while running jobs 1. What happens when a master fails 2. What happens when a slave fails 3. Can you mid job add and remove slaves Regarding the install on Meso, if I understand correctly the Spark master is behind a Zookeeper quorum so that isolates the slaves from a master failure, but what about the masters behind quorum? Cheers - Ian
Re: Error in SparkSQL Example
Hi Michael, Thanks for the clarification. My question is about the error above error: class $iwC needs to be abstract and what does the RDD brings, since I can do the DSL without the people: people: org.apache.spark.rdd.RDD[Person] Thanks, On Mon, Mar 31, 2014 at 9:13 AM, Michael Armbrust mich...@databricks.comwrote: val people: RDD[Person] // An RDD of case class objects, from the first example. is just a placeholder to avoid cluttering up each example with the same code for creating an RDD. The : RDD[People] is just there to let you know the expected type of the variable 'people'. Perhaps there is a clearer way to indicate this. As you have realized, using the full line from the first example will allow you to run the rest of them. On Sun, Mar 30, 2014 at 7:31 AM, Manoj Samel manojsamelt...@gmail.comwrote: Hi, On http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html, I am trying to run code on Writing Language-Integrated Relational Queries ( I have 1.0.0 Snapshot ). I am running into error on val people: RDD[Person] // An RDD of case class objects, from the first example. scala val people: RDD[Person] console:19: error: not found: type RDD val people: RDD[Person] ^ scala val people: org.apache.spark.rdd.RDD[Person] console:18: error: class $iwC needs to be abstract, since value people is not defined class $iwC extends Serializable { ^ Any idea what the issue is ? Also, its not clear what does the RDD[Person] brings. I can run the DSL without the case class objects RDD ... val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) val teenagers = people.where('age = 13).where('age = 19) Thanks,
Error in SparkSQL Example
Hi, On http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html, I am trying to run code on Writing Language-Integrated Relational Queries ( I have 1.0.0 Snapshot ). I am running into error on val people: RDD[Person] // An RDD of case class objects, from the first example. scala val people: RDD[Person] console:19: error: not found: type RDD val people: RDD[Person] ^ scala val people: org.apache.spark.rdd.RDD[Person] console:18: error: class $iwC needs to be abstract, since value people is not defined class $iwC extends Serializable { ^ Any idea what the issue is ? Also, its not clear what does the RDD[Person] brings. I can run the DSL without the case class objects RDD ... val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) val teenagers = people.where('age = 13).where('age = 19) Thanks,
Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?
Hi, I am trying SparkSQL based on the example on doc ... val people = sc.textFile(/data/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)) val olderThanTeans = people.where('age 19) val youngerThanTeans = people.where('age 13) val nonTeans = youngerThanTeans.union(olderThanTeans) I can do a orderBy('age) on first two (which are SchemaRDD) but not on third. The nonTeans is a UnionRDD that does not supports orderBy. This seems different than the SQL behavior where results of 2 SQL unions is a SQL itself with same functionality ... Not clear why union of 2 SchemaRDDs does not produces a SchemaRDD Thanks,
SparkSQL where with BigDecimal type gives stacktrace
Hi, If I do a where on BigDecimal, I get a stack trace. Changing BigDecimal to Double works ... scala case class JournalLine(account: String, credit: BigDecimal, debit: BigDecimal, date: String, company: String, currency: String, costcenter: String, region: String) defined class JournalLine ... scala jl.where('credit 0).foreach(println) scala.MatchError: scala.BigDecimal (of class scala.reflect.internal.Types$TypeRef$$anon$3) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:41) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:45) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:38) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:32) at org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:128) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:79) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:39) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:777) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:883) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:981) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) Thanks,
Re: SparkSQL where with BigDecimal type gives stacktrace
Hi, Would the same issue be present for other Java type like Date ? Converting the person/teenager example on Patricks page reproduces the problem ... Thanks, scala import scala.math import scala.math scala case class Person(name: String, age: BigDecimal) defined class Person scala val people = sc.textFile(/data/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), BigDecimal(p(1).trim.toInt))) 14/03/31 00:23:40 INFO MemoryStore: ensureFreeSpace(32960) called with curMem=0, maxMem=308713881 14/03/31 00:23:40 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.2 KB, free 294.4 MB) people: org.apache.spark.rdd.RDD[Person] = MappedRDD[3] at map at console:20 scala people take 1 ... scala val t = people.where('age 12 ) scala.MatchError: scala.BigDecimal (of class scala.reflect.internal.Types$TypeRef$$anon$3) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:41) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:45) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:38) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:32) at org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:128) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:79) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC.init(console:33) at $iwC.init(console:35) at init(console:37) at .init(console:41) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:777) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:883) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:981) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) On Sun, Mar 30, 2014 at 11:04 AM, Aaron Davidson ilike...@gmail.com wrote: Well, the error is coming from this case statement not matching on the BigDecimal type: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L41 This seems to be a bug because there is a corresponding Catalyst DataType for BigDecimal, just no way to produce a schema for it. A patch should be straightforward enough to match against typeOf[BigDecimal] assuming this was not for some reason intentional. On Sun, Mar 30, 2014 at 10:43 AM, smallmonkey...@hotmail.com smallmonkey...@hotmail.com wrote: can I get the whole operation? then i can try to locate the error -- smallmonkey...@hotmail.com *From:* Manoj Samel manojsamelt...@gmail.com *Date:* 2014-03-31 01:16 *To:* user user@spark.apache.org *Subject:* SparkSQL where with BigDecimal type gives stacktrace Hi, If I do a where on BigDecimal, I get a stack trace. Changing
groupBy RDD does not have grouping column ?
Hi, If I create a groupBy('a)(Sum('b) as 'foo, Sum('c) as 'bar), then the resulting RDD should have 'a, 'foo and 'bar. The result RDD just shows 'foo and 'bar and is missing 'a Thoughts? Thanks, Manoj