Spark 1.4 - memory bloat in group by/aggregate???

2015-06-26 Thread Manoj Samel
Hi,


   - Spark 1.4 on a single node machine. Run spark-shell
   - Reading from Parquet file with bunch of text columns and couple of
   amounts in decimal(14,4). On disk size of of the file is 376M. It has ~100
   million rows
   - rdd1 = sqlcontext.read.parquet
   - rdd1.cache
   - group_by_df =
   rdd1.groupBy(a).agg(sum(rdd1(amount1)),sum(rdd1(amount2)))
   - group_by_df.cache
   - group_by_df.count // Trigger action - Results in 725 rows
   - Run top on machine
   - In the spark UI, the storage shows base ParquetRDD size is 2.3GB
   (multiple of storage size 376M), the size of the group_by_df is 43.2 KB.
   This seems ok
   - However, the top command shows the process memory RES part jumping
   from 2g at start to 31g after the count. This seems excessive for one group
   by operator and will lead to trouble for repeated similar operations on the
   data ...

Any thoughts ?

Thanks,


Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2

2015-04-15 Thread Manoj Samel
Env - Spark 1.3 Hadoop 2.3, Kerbeos

 xx.saveAsTextFile(path, codec) gives following trace. Same works with
Spark 1.2 in same environment

val codec = classOf[some codec class]

val a = sc.textFile(/some_hdfs_file)

a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace
in Spark 1.3, works in Spark 1.2 in same env

15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0
(TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot
authenticate the provider BC) [duplicate 7]
15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate
the provider BC
at javax.crypto.Cipher.getInstance(Cipher.java:642)
at javax.crypto.Cipher.getInstance(Cipher.java:580)
 some codec calls 
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.jar.JarException:
file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned
entries - org/apache/spark/SparkHadoopWriter$.class
at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
at javax.crypto.Cipher.getInstance(Cipher.java:638)
... 16 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
http://org.apache.spark.scheduler.dagscheduler.org/
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


park-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class

2015-04-14 Thread Manoj Samel
With Spark 1.3 xx.saveAsTextFile(path, codec) gives following trace. Same
works with Spark 1.2

Config is CDH 5.3.0 (Hadoop 2.3) with Kerberos

15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0
(TID 17) on executor node1078.svc.devpg.pdx.wd: java.lang.SecurityException
(JCE cannot authenticate the provider BC) [duplicate 7]
15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 16, node1080.svc.devpg.pdx.wd): java.lang.SecurityException: JCE
cannot authenticate the provider BC
at javax.crypto.Cipher.getInstance(Cipher.java:642)
at javax.crypto.Cipher.getInstance(Cipher.java:580)
at com.workday.mrcodec.CryptoAESHelper.setupCrypto(CryptoAESHelper.java:61)
at com.workday.mrcodec.CryptoAESHelper.init(CryptoAESHelper.java:48)
at
com.workday.mrcodec.CryptoAESCompressor.init(CryptoAESCompressor.java:48)
at
com.workday.mrcodec.CryptoAESCodec.createCompressor(CryptoAESCodec.java:52)
at
com.workday.mrcodec.CryptoAESCodec.createOutputStream(CryptoAESCodec.java:148)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.jar.JarException:
file:/hadoop/disk7/yarn/local/usercache/dev.baseline/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar
has unsigned entries - org/apache/spark/SparkHadoopWriter$.class
at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
at javax.crypto.Cipher.getInstance(Cipher.java:638)
... 16 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Re: How to specify the port for AM Actor ...

2015-04-01 Thread Manoj Samel
Filed https://issues.apache.org/jira/browse/SPARK-6653

On Sun, Mar 29, 2015 at 8:18 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 LGTM. Could you open a JIRA and send a PR? Thanks.

 Best Regards,
 Shixiong Zhu

 2015-03-28 7:14 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 I looked @ the 1.3.0 code and figured where this can be added

 In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is

 actorSystem = AkkaUtils.createActorSystem(sparkYarnAM,
 Utils.localHostName, 0,
   conf = sparkConf, securityManager = securityMgr)._1


 If I change it to below, then I can start it on the port I want.

 val port = sparkConf.getInt(spark.am.actor.port, 0) // New property
 ...
 actorSystem = AkkaUtils.createActorSystem(sparkYarnAM,
 Utils.localHostName, port,
   conf = sparkConf, securityManager = securityMgr)._1

 Thoughts? Any other place where any change is needed?



 On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 There is no configuration for it now.

 Best Regards,
 Shixiong Zhu

 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 There may be firewall rules limiting the ports between host running
 spark and the hadoop cluster. In that case, not all ports are allowed.

 Can it be a range of ports that can be specified ?

 On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com
 wrote:

 It's a random port to avoid port conflicts, since multiple AMs can run
 in the same machine. Why do you need a fixed port?

 Best Regards,
 Shixiong Zhu

 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 Spark 1.3, Hadoop 2.5, Kerbeors

 When running spark-shell in yarn client mode, it shows following
 message with a random port every time (44071 in example below). Is there 
 a
 way to specify that port to a specific port ? It does not seem to be part
 of ports specified in
 http://spark.apache.org/docs/latest/configuration.html
 spark.xxx.port ...

 Thanks,

 15/03/25 22:27:10 INFO Client: Application report for
 application_1427316153428_0014 (state: ACCEPTED)
 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
 registered as Actor[akka.tcp://sparkYarnAM@xyz
 :44071/user/YarnAM#-1989273896]









Spark 1.3 Source - Github and source tar does not seem to match

2015-03-27 Thread Manoj Samel
While looking into a issue, I noticed that the source displayed on Github
site does not matches the downloaded tar for 1.3

Thoughts ?


Re: How to specify the port for AM Actor ...

2015-03-27 Thread Manoj Samel
I looked @ the 1.3.0 code and figured where this can be added

In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is

actorSystem = AkkaUtils.createActorSystem(sparkYarnAM,
Utils.localHostName, 0,
  conf = sparkConf, securityManager = securityMgr)._1


If I change it to below, then I can start it on the port I want.

val port = sparkConf.getInt(spark.am.actor.port, 0) // New property
...
actorSystem = AkkaUtils.createActorSystem(sparkYarnAM,
Utils.localHostName, port,
  conf = sparkConf, securityManager = securityMgr)._1

Thoughts? Any other place where any change is needed?



On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 There is no configuration for it now.

 Best Regards,
 Shixiong Zhu

 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 There may be firewall rules limiting the ports between host running spark
 and the hadoop cluster. In that case, not all ports are allowed.

 Can it be a range of ports that can be specified ?

 On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 It's a random port to avoid port conflicts, since multiple AMs can run
 in the same machine. Why do you need a fixed port?

 Best Regards,
 Shixiong Zhu

 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 Spark 1.3, Hadoop 2.5, Kerbeors

 When running spark-shell in yarn client mode, it shows following
 message with a random port every time (44071 in example below). Is there a
 way to specify that port to a specific port ? It does not seem to be part
 of ports specified in
 http://spark.apache.org/docs/latest/configuration.html spark.xxx.port
 ...

 Thanks,

 15/03/25 22:27:10 INFO Client: Application report for
 application_1427316153428_0014 (state: ACCEPTED)
 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
 registered as Actor[akka.tcp://sparkYarnAM@xyz
 :44071/user/YarnAM#-1989273896]







How to specify the port for AM Actor ...

2015-03-25 Thread Manoj Samel
Spark 1.3, Hadoop 2.5, Kerbeors

When running spark-shell in yarn client mode, it shows following message
with a random port every time (44071 in example below). Is there a way to
specify that port to a specific port ? It does not seem to be part of ports
specified in http://spark.apache.org/docs/latest/configuration.html
spark.xxx.port ...

Thanks,

15/03/25 22:27:10 INFO Client: Application report for
application_1427316153428_0014 (state: ACCEPTED)
15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
registered as Actor[akka.tcp://sparkYarnAM@xyz
:44071/user/YarnAM#-1989273896]


Re: How to specify the port for AM Actor ...

2015-03-25 Thread Manoj Samel
There may be firewall rules limiting the ports between host running spark
and the hadoop cluster. In that case, not all ports are allowed.

Can it be a range of ports that can be specified ?

On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 It's a random port to avoid port conflicts, since multiple AMs can run in
 the same machine. Why do you need a fixed port?

 Best Regards,
 Shixiong Zhu

 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 Spark 1.3, Hadoop 2.5, Kerbeors

 When running spark-shell in yarn client mode, it shows following message
 with a random port every time (44071 in example below). Is there a way to
 specify that port to a specific port ? It does not seem to be part of ports
 specified in http://spark.apache.org/docs/latest/configuration.html
 spark.xxx.port ...

 Thanks,

 15/03/25 22:27:10 INFO Client: Application report for
 application_1427316153428_0014 (state: ACCEPTED)
 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
 registered as Actor[akka.tcp://sparkYarnAM@xyz
 :44071/user/YarnAM#-1989273896]





Re: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04

2015-03-24 Thread Manoj Samel
Thanks Marcelo - I was using the SBT built spark per earlier thread. I
switched now to the distro (with the conf changes for CDH path in front)
and guava issue is gone.

Thanks,

On Tue, Mar 24, 2015 at 1:50 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi there,

 On Tue, Mar 24, 2015 at 1:40 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:
  When I run any query, it gives java.lang.NoSuchMethodError:
 
 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;

 Are you running a custom-compiled Spark by any chance? Specifically,
 one you built with sbt? That would hit this problem, because the path
 I suggested (/usr/lib/hadoop/client/*) contains an older guava
 library, which would override the one shipped with the sbt-built
 Spark.

 If you build Spark with maven, or use the pre-built Spark distro, or
 specifically filter out the guava jar from your classpath when setting
 up the Spark job, things should work.

 --
 Marcelo

 --

 ---
 You received this message because you are subscribed to the Google Groups
 CDH Users group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to cdh-user+unsubscr...@cloudera.org.
 For more options, visit https://groups.google.com/a/cloudera.org/d/optout.



Hadoop 2.5 not listed in Spark 1.4 build page

2015-03-24 Thread Manoj Samel
http://spark.apache.org/docs/latest/building-spark.html#packaging-without-hadoop-dependencies-for-yarn
does not list hadoop 2.5 in Hadoop version table table etc.

I assume it is still OK to compile with  -Pyarn -Phadoop-2.5 for use with
Hadoop 2.5 (cdh 5.3.2)

Thanks,


Re: Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04

2015-03-24 Thread Manoj Samel
Thanks All - perhaps I misread the earlier posts as dependencies with
Hadoop version, but the key is also the CDH 5.3.2 (not just Hadoop 2.5 v/s
2.4) etc.

After adding the classPath as Marcelo/Harsh suggested (loading CDH libs
front), I am able to get spark-shell started without invalid container etc
so that issue is solved.

When I run any query, it gives java.lang.NoSuchMethodError:
com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;

This seems to be Guava lib version issue that has been known ... I will
look into it.

Thanks again !

On Tue, Mar 24, 2015 at 12:50 PM, Harsh J ha...@cloudera.com wrote:

 My comment's still the same: Runtime-link-via-classpath Spark to use CDH
 5.3.2 libraries, just like your cluster does, not Apache Hadoop 2.5.0
 (which CDH is merely based on, but carries several backports on top that
 aren't in Apache Hadoop 2.5.0, one of which addresses this parsing trouble).

 You do not require to recompile Spark, just alter its hadoop libraries in
 its classpath to be that of CDH server version (overwrite from parcels,
 etc.).

 On Wed, Mar 25, 2015 at 1:06 AM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 I recompiled Spark 1.3 with Hadoop 2.5; it still gives same stack trace.

 A quick browse into  stacktrace with Hadoop 2.5.0
 org.apache.hadoop.yarn.util.ConverterUtils ...

 1. toContainerId gets parameter containerId which I assume is container_
 *e*06_1427223073530_0001_01_01
 2. It splits it using public static final Splitter _SPLITTER =
 Splitter.on('_').trimResults();
 3. Line 172 checks container prefix with CONTAINER_PREFIX which is valid
 (container)
 4. It calls toApplicationAttemptId
 5. toApplicationAttemptId tries Long.parseLong(it.next()) on e06 and
 dies

 Seems like it is not expecting a non-numeric character. Is this a Yarn
 issue ?

 Thanks,

 On Tue, Mar 24, 2015 at 8:25 AM, Manoj Samel manoj.sa...@gmail.com
 wrote:

 I'll compile Spark with Hadoop libraries and try again ...

 Thanks,

 Manoj

 On Mar 23, 2015, at 10:34 PM, Harsh J ha...@cloudera.com wrote:

 This may happen if you are using different versions of CDH5 jars between
 Spark and the cluster. Can you ensure your Spark's Hadoop CDH jars match
 the cluster version exactly, since you seem to be using a custom version of
 Spark (out of CDH) here?

 On Tue, Mar 24, 2015 at 7:32 AM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 x-post to CDH list for any insight ...

 Thanks,

 -- Forwarded message --
 From: Manoj Samel manojsamelt...@gmail.com
 Date: Mon, Mar 23, 2015 at 6:32 PM
 Subject: Invalid ContainerId ... Caused by:
 java.lang.NumberFormatException: For input string: e04
 To: user@spark.apache.org user@spark.apache.org


 Spark 1.3, CDH 5.3.2, Kerberos

 Setup works fine with base configuration, spark-shell can be used in
 yarn client mode etc.

 When work recovery feature is enabled via
 http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_ha_yarn_work_preserving_recovery.html,
 the spark-shell fails with following log

 15/03/24 01:20:16 ERROR yarn.ApplicationMaster: Uncaught exception:
 java.lang.IllegalArgumentException: Invalid ContainerId:
 container_e04_1427159778706_0002_01_01
 at
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
 at
 org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:93)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:83)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:576)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
 at
 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:574)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:597)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
 Caused by: java.lang.NumberFormatException: For input string: e04
 at
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 at java.lang.Long.parseLong(Long.java:589)
 at java.lang.Long.parseLong(Long.java:631)
 at
 org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
 at
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
 ... 12 more
 15/03/24 01:20:16 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 10, (reason

Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Manoj Samel
Log shows stack traces that seem to match the assert in JIRA so it seems I
am hitting the issue. Thanks for the heads up ...

15/03/23 20:29:50 ERROR actor.OneForOneStrategy: assertion failed:
Allocator killed more executors than are allocated!
java.lang.AssertionError: assertion failed: Allocator killed more executors
than are allocated!
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.deploy.yarn.YarnAllocator.killExecutor(YarnAllocator.scala:152)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1.applyOrElse(ApplicationMaster.scala:547)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor.aroundReceive(ApplicationMaster.scala:506)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

On Mon, Mar 23, 2015 at 2:25 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:
  Found the issue above error - the setting for spark_shuffle was
 incomplete.
 
  Now it is able to ask and get additional executors. The issue is once
 they
  are released, it is not able to proceed with next query.

 That looks like SPARK-6325, which unfortunately was not fixed in time
 for 1.3.0...

 --
 Marcelo



Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Manoj Samel
)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.mutable.MapLike$class.retain(MapLike.scala:212)
at scala.collection.mutable.AbstractMap.retain(Map.scala:91)
at org.apache.spark.ExecutorAllocationManager.org
$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:234)
at
org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:191)

The next query hangs with following output

..
15/03/23 20:57:27 INFO MemoryStore: Block broadcast_4_piece0 stored as
bytes in memory (estimated size 4.5 KB, free 264.9 MB)
15/03/23 20:57:27 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory
on xxx:45903 (size: 4.5 KB, free: 265.1 MB)
15/03/23 20:57:27 INFO BlockManagerMaster: Updated info of block
broadcast_4_piece0
15/03/23 20:57:27 INFO SparkContext: Created broadcast 4 from broadcast at
DAGScheduler.scala:839
15/03/23 20:57:27 INFO DAGScheduler: Submitting 10 missing tasks from Stage
2 (MapPartitionsRDD[11] at mapPartitions at Exchange.scala:64)
15/03/23 20:57:27 INFO YarnScheduler: Adding task set 2.0 with 10 tasks
15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(2, yyy, 37231) with no recent heart beats: 131006ms exceeds
12ms
15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(5, zzz, 34437) with no recent heart beats: 131329ms exceeds
12ms
15/03/23 20:57:56 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(1, bbb, 52747) with no recent heart beats: 128199ms exceeds
12ms
15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(2, yyy, 37231)
15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(1, bbb, 52747)
15/03/23 20:57:56 INFO BlockManagerMasterActor: Removing block manager
BlockManagerId(5, zzz, 34437)



On Sat, Mar 21, 2015 at 6:51 AM, Ted Yu yuzhih...@gmail.com wrote:

 bq. Requesting 1 new executor(s) because tasks are backlogged

 1 executor was requested.

 Which hadoop release are you using ?

 Can you check resource manager log to see if there is some clue ?

 Thanks

 On Fri, Mar 20, 2015 at 4:17 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Forgot to add - the cluster is idle otherwise so there should be no
 resource issues. Also the configuration works when not using Dynamic
 allocation.

 On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi,

 Running Spark 1.3 with secured Hadoop.

 Spark-shell with Yarn client mode runs without issue when not using
 Dynamic Allocation.

 When Dynamic allocation is turned on, the shell comes up but same SQL
 etc. causes it to loop.

 spark.dynamicAllocation.enabled=true
 spark.dynamicAllocation.initialExecutors=1
 spark.dynamicAllocation.maxExecutors=10
 # Set IdleTime low for testing
 spark.dynamicAllocation.executorIdleTimeout=60
 spark.shuffle.service.enabled=true

 Following is the start of the messages and then it keeps looping with
 Requesting 0 new executors

 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_1_piece0
 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
 broadcast at DAGScheduler.scala:839
 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing
 tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at
 Exchange.scala:100)
 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
 tasks
 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient resources
 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient resources
 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1

Invalid ContainerId ... Caused by: java.lang.NumberFormatException: For input string: e04

2015-03-23 Thread Manoj Samel
Spark 1.3, CDH 5.3.2, Kerberos

Setup works fine with base configuration, spark-shell can be used in yarn
client mode etc.

When work recovery feature is enabled via
http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_ha_yarn_work_preserving_recovery.html,
the spark-shell fails with following log

15/03/24 01:20:16 ERROR yarn.ApplicationMaster: Uncaught exception:
java.lang.IllegalArgumentException: Invalid ContainerId:
container_e04_1427159778706_0002_01_01
at
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
at
org.apache.spark.deploy.yarn.YarnRMClient.getAttemptId(YarnRMClient.scala:93)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:83)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:576)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:574)
at
org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:597)
at
org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.NumberFormatException: For input string: e04
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at
org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
at
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
... 12 more
15/03/24 01:20:16 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
container_e04_1427159778706_0002_01_01)


Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-20 Thread Manoj Samel
Hi,

Running Spark 1.3 with secured Hadoop.

Spark-shell with Yarn client mode runs without issue when not using Dynamic
Allocation.

When Dynamic allocation is turned on, the shell comes up but same SQL etc.
causes it to loop.

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors=1
spark.dynamicAllocation.maxExecutors=10
# Set IdleTime low for testing
spark.dynamicAllocation.executorIdleTimeout=60
spark.shuffle.service.enabled=true

Following is the start of the messages and then it keeps looping with
Requesting 0 new executors

15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:839
15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
tasks
15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient resources
15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient resources
15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)


Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-20 Thread Manoj Samel
Forgot to add - the cluster is idle otherwise so there should be no
resource issues. Also the configuration works when not using Dynamic
allocation.

On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Hi,

 Running Spark 1.3 with secured Hadoop.

 Spark-shell with Yarn client mode runs without issue when not using
 Dynamic Allocation.

 When Dynamic allocation is turned on, the shell comes up but same SQL etc.
 causes it to loop.

 spark.dynamicAllocation.enabled=true
 spark.dynamicAllocation.initialExecutors=1
 spark.dynamicAllocation.maxExecutors=10
 # Set IdleTime low for testing
 spark.dynamicAllocation.executorIdleTimeout=60
 spark.shuffle.service.enabled=true

 Following is the start of the messages and then it keeps looping with
 Requesting 0 new executors

 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_1_piece0
 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
 broadcast at DAGScheduler.scala:839
 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
 from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
 tasks
 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient resources
 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient resources
 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)



Dataframe v/s SparkSQL

2015-03-02 Thread Manoj Samel
Is it correct to say that Spark Dataframe APIs are implemented using same
execution as SparkSQL ? In other words, while the dataframe API is
different than SparkSQL, the runtime performance of equivalent constructs
in Dataframe and SparkSQL should be same. So one should be able to choose
whichever of the two (DF v/s SQL) suite the use cases and not worry about
runtime performance.

Pl comment ...

Thanks,


New ColumnType For Decimal Caching

2015-02-13 Thread Manoj Samel
Thanks Michael for the pointer  Sorry for the delayed reply.

Taking a quick inventory of scope of change - Is the column type for
Decimal caching needed only in the caching layer (4 files
in org.apache.spark.sql.columnar - ColumnAccessor.scala,
ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)

Or do other SQL components also need to be touched ?

Hoping for a quick feedback of top of your head ...

Thanks,



On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust mich...@databricks.com
wrote:

 You could add a new ColumnType
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
 .

 PRs welcome :)

 On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 As a test, I have same data loaded as another parquet - except with the 2
 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
 the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
 time of uncached query.

 Would it be possible for Spark to store in-memory decimal in some form of
 long with decoration ?

 For the immediate future, is there any hook that we can use to provide
 custom caching / processing for the decimal type in RDD so other semantic
 does not changes ?

 Thanks,




 On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
  wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
  wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10
 partitions) with zero disk usage. Tasks for subsequent select on this 
 table
 in cache shows minimal overheads (GC, queueing, shuffle write etc. 
 etc.),
 so overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer =
 org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 
 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, 
 even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of
 12GB (full size of cache table) and takes twice the time as mapPartition
 for uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time
 slow since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,












Is there a separate mailing list for Spark Developers ?

2015-02-12 Thread Manoj Samel
d...@spark.apache.org
http://apache-spark-developers-list.1001551.n3.nabble.com/ mentioned on
http://spark.apache.org/community.html seems to be bouncing. Is there
another one ?


Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Manoj Samel
Hi Michael,

The storage tab shows the RDD resides fully in memory (10 partitions) with
zero disk usage. Tasks for subsequent select on this table in cache shows
minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is
not issue. However, it is still twice as slow as reading uncached table.

I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed
= true, spark.serializer = org.apache.spark.serializer.KryoSerializer

Something that may be of relevance ...

The underlying table is Parquet, 10 partitions totaling ~350 MB. For
mapPartition phase of query on uncached table shows input size of 351 MB.
However, after the table is cached, the storage shows the cache size as
12GB. So the in-memory representation seems much bigger than on-disk, even
with the compression options turned on. Any thoughts on this ?

mapPartition phase same query for cache table shows input size of 12GB
(full size of cache table) and takes twice the time as mapPartition for
uncached query.

Thanks,






On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com
wrote:

 Check the storage tab.  Does the table actually fit in memory? Otherwise
 you are rebuilding column buffers in addition to reading the data off of
 the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow since
 loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be faster as
 it should be reading from cache, not HDFS. But it is slower than test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,





Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Manoj Samel
Could you share which data types are optimized in the in-memory storage and
how are they optimized ?

On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10 partitions)
 with zero disk usage. Tasks for subsequent select on this table in cache
 shows minimal overheads (GC, queueing, shuffle write etc. etc.), so
 overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer = org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of 12GB
 (full size of cache table) and takes twice the time as mapPartition for
 uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow
 since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,









Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Manoj Samel
Hi Michael,

As a test, I have same data loaded as another parquet - except with the 2
decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
time of uncached query.

Would it be possible for Spark to store in-memory decimal in some form of
long with decoration ?

For the immediate future, is there any hook that we can use to provide
custom caching / processing for the decimal type in RDD so other semantic
does not changes ?

Thanks,




On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
 wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com
  wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10 partitions)
 with zero disk usage. Tasks for subsequent select on this table in cache
 shows minimal overheads (GC, queueing, shuffle write etc. etc.), so
 overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer = org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of 12GB
 (full size of cache table) and takes twice the time as mapPartition for
 uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
  wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow
 since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,










SQL group by on Parquet table slower when table cached

2015-02-06 Thread Manoj Samel
Spark 1.2

Data stored in parquet table (large number of rows)

Test 1

select a, sum(b), sum(c) from table

Test

sqlContext.cacheTable()
select a, sum(b), sum(c) from table  - seed cache First time slow since
loading cache ?
select a, sum(b), sum(c) from table  - Second time it should be faster as
it should be reading from cache, not HDFS. But it is slower than test1

Any thoughts? Should a different query be used to seed cache ?

Thanks,


Large # of tasks in groupby on single table

2015-02-04 Thread Manoj Samel
Spark 1.2
Data is read from parquet with 2 partitions and is cached as table with 2
partitions. Verified in UI that it shows RDD with 2 partitions  it is
fully cached in memory

Cached data contains column a, b, c. Column a has ~150 distinct values.

Next run SQL on this table as select a, sum(b), sum(c) from table x

The query creates 200 tasks. Further, the advanced metric scheduler delay
is significant % for most of these tasks. This seems very high overhead for
query on RDD with 2 partitions

It seems if this is run with less number of task, the query should run
faster ?

Any thoughts on how to control # of partitions for the group by (or other
SQLs) ?

Thanks,


Re: Large # of tasks in groupby on single table

2015-02-04 Thread Manoj Samel
Follow up for closure on thread ...

1. spark.sql.shuffle.partitions is not on config page but is mentioned on
http://spark.apache.org/docs/1.2.0/sql-programming-guide.html. Would be
better to have it in config page as well for sake of completeness. Should I
file a doc bug ?
2. Regarding my #2 above (Spark should auto-determining # of tasks), there
is already a write up on SQL Programming page in Hive optimizations not in
Spark Automatically determine the number of reducers for joins and
groupbys: Currently in Spark SQL, you need to control the degree of
parallelism post-shuffle using “SET
spark.sql.shuffle.partitions=[num_tasks];”. Any idea if and when this is
scheduled ? Even a rudimentary implementation (e.g. based on # of
partitions of underlying RDD, which is available now) would be a
improvement over current fixed 200 and would be a critical feature for
SparkSQL feasibility

Thanks

On Wed, Feb 4, 2015 at 4:09 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Awesome ! By setting this, I could minimize the collect overhead, e.g by
 setting it to # of partitions of the RDD.

 Two questions

 1. I had looked for such option in
 http://spark.apache.org/docs/latest/configuration.html but this is not
 documented. Seems this a doc. bug ?
 2. Ideally the shuffle partitions should be derive from underlying
 table(s) and a optimal number should be set for each query. Having one
 number across all queries is not ideal, nor do the consumer wants to set it
 before each query to different #. Any thoughts ?


 Thanks !

 On Wed, Feb 4, 2015 at 3:50 PM, Ankur Srivastava 
 ankur.srivast...@gmail.com wrote:

 Hi Manoj,

 You can set the number of partitions you want your sql query to use. By
 default it is 200 and thus you see that number. You can update it using the
 spark.sql.shuffle.partitions property

 spark.sql.shuffle.partitions200Configures the number of partitions to
 use when shuffling data for joins or aggregations.

 Thanks
 Ankur

 On Wed, Feb 4, 2015 at 3:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi,

 Any thoughts on this?

 Most of the 200 tasks in collect phase take less than 20 ms and lot of
 time is spent on scheduling these. I suspect the overall time will reduce
 if # of tasks are dropped to a much smaller # (close to # of partitions?)

 Any help is appreciated

 Thanks

 On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2
 Data is read from parquet with 2 partitions and is cached as table with
 2 partitions. Verified in UI that it shows RDD with 2 partitions  it is
 fully cached in memory

 Cached data contains column a, b, c. Column a has ~150 distinct values.

 Next run SQL on this table as select a, sum(b), sum(c) from table x

 The query creates 200 tasks. Further, the advanced metric scheduler
 delay is significant % for most of these tasks. This seems very high
 overhead for query on RDD with 2 partitions

 It seems if this is run with less number of task, the query should run
 faster ?

 Any thoughts on how to control # of partitions for the group by (or
 other SQLs) ?

 Thanks,







Re: Large # of tasks in groupby on single table

2015-02-04 Thread Manoj Samel
Hi,

Any thoughts on this?

Most of the 200 tasks in collect phase take less than 20 ms and lot of time
is spent on scheduling these. I suspect the overall time will reduce if #
of tasks are dropped to a much smaller # (close to # of partitions?)

Any help is appreciated

Thanks

On Wed, Feb 4, 2015 at 12:38 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Spark 1.2
 Data is read from parquet with 2 partitions and is cached as table with 2
 partitions. Verified in UI that it shows RDD with 2 partitions  it is
 fully cached in memory

 Cached data contains column a, b, c. Column a has ~150 distinct values.

 Next run SQL on this table as select a, sum(b), sum(c) from table x

 The query creates 200 tasks. Further, the advanced metric scheduler
 delay is significant % for most of these tasks. This seems very high
 overhead for query on RDD with 2 partitions

 It seems if this is run with less number of task, the query should run
 faster ?

 Any thoughts on how to control # of partitions for the group by (or other
 SQLs) ?

 Thanks,



Re: Error in saving schemaRDD with Decimal as Parquet

2015-02-03 Thread Manoj Samel
Hi,

Any thoughts ?

Thanks,

On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Spark 1.2

 SchemaRDD has schema with decimal columns created like

 x1 = new StructField(a, DecimalType(14,4), true)

 x2 = new StructField(b, DecimalType(14,4), true)

 Registering as SQL Temp table and doing SQL queries on these columns ,
 including SUM etc. works fine, so the schema Decimal does not seems to be
 issue

 When doing saveAsParquetFile on the RDD, it gives following error. Not
 sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems
 to see it as scala.math.BigDecimal

 java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
 org.apache.spark.sql.catalyst.types.decimal.Decimal

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(
 ParquetTableSupport.scala:359)

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
 ParquetTableSupport.scala:328)

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
 ParquetTableSupport.scala:314)

 at parquet.hadoop.InternalParquetRecordWriter.write(
 InternalParquetRecordWriter.java:120)

 at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)

 at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)

 at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
 $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(
 ParquetTableOperations.scala:308)

 at
 org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
 ParquetTableOperations.scala:325)

 at
 org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
 ParquetTableOperations.scala:325)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:56)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:744)






Error in saving schemaRDD with Decimal as Parquet

2015-02-01 Thread Manoj Samel
Spark 1.2

SchemaRDD has schema with decimal columns created like

x1 = new StructField(a, DecimalType(14,4), true)

x2 = new StructField(b, DecimalType(14,4), true)

Registering as SQL Temp table and doing SQL queries on these columns ,
including SUM etc. works fine, so the schema Decimal does not seems to be
issue

When doing saveAsParquetFile on the RDD, it gives following error. Not sure
why the DecimalType in SchemaRDD is not seen by Parquet, which seems to
see it as scala.math.BigDecimal

java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
org.apache.spark.sql.catalyst.types.decimal.Decimal

at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(
ParquetTableSupport.scala:359)

at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
ParquetTableSupport.scala:328)

at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
ParquetTableSupport.scala:314)

at parquet.hadoop.InternalParquetRecordWriter.write(
InternalParquetRecordWriter.java:120)

at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)

at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)

at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(
ParquetTableOperations.scala:308)

at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
ParquetTableOperations.scala:325)

at
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
ParquetTableOperations.scala:325)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

at org.apache.spark.scheduler.Task.run(Task.scala:56)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:744)


Re: Error in saving schemaRDD with Decimal as Parquet

2015-02-01 Thread Manoj Samel
I think I found the issue causing it.

I was calling schemaRDD.coalesce(n).saveAsParquetFile to reduce the number
of partitions in parquet file - in which case the stack trace happens.

If I compress the partitions before creating schemaRDD then the
schemaRDD.saveAsParquetFile call works for decimal

So it seems schemaRDD.coalesce returns a RDD whose schema does not matches
the source RDD in that decimal type seem to get changed.

Any thoughts ? Is this a bug ???

Thanks,


On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Spark 1.2

 SchemaRDD has schema with decimal columns created like

 x1 = new StructField(a, DecimalType(14,4), true)

 x2 = new StructField(b, DecimalType(14,4), true)

 Registering as SQL Temp table and doing SQL queries on these columns ,
 including SUM etc. works fine, so the schema Decimal does not seems to be
 issue

 When doing saveAsParquetFile on the RDD, it gives following error. Not
 sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems
 to see it as scala.math.BigDecimal

 java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
 org.apache.spark.sql.catalyst.types.decimal.Decimal

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(
 ParquetTableSupport.scala:359)

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
 ParquetTableSupport.scala:328)

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
 ParquetTableSupport.scala:314)

 at parquet.hadoop.InternalParquetRecordWriter.write(
 InternalParquetRecordWriter.java:120)

 at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)

 at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)

 at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
 $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(
 ParquetTableOperations.scala:308)

 at
 org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
 ParquetTableOperations.scala:325)

 at
 org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
 ParquetTableOperations.scala:325)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:56)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:744)






Why is DecimalType separate from DataType ?

2015-01-30 Thread Manoj Samel
Spark 1.2

While building schemaRDD using StructType

xxx = new StructField(credit_amount, DecimalType, true) gives error type
mismatch; found : org.apache.spark.sql.catalyst.types.DecimalType.type
required: org.apache.spark.sql.catalyst.types.DataType

From
https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.package,
it seems DecimalType = sql.catalyst.types.DecimalType is separate from
DataType = sql.catalyst.types.DataType

Not sure why that is the case? How does one uses Decimal and other types in
StructField?

Thanks,


schemaRDD.saveAsParquetFile creates large number of small parquet files ...

2015-01-29 Thread Manoj Samel
Spark 1.2 on Hadoop 2.3

Read one big csv file, create a schemaRDD on it and saveAsParquetFile.

It creates a large number of small (~1MB ) parquet part-x- files.

Any way to control so that smaller number of large files are created ?

Thanks,


SparkSQL Performance Tuning Options

2015-01-27 Thread Manoj Samel
Spark 1.2, no Hive, prefer not to use HiveContext to avoid metastore_db.

Use case is Spark Yarn app will start and serve as query server for
multiple users i.e. always up and running. At startup, there is option to
cache data and also pre-compute some results sets, hash maps etc. that
would be likely be asked by client APIs. I.e there is some option to use
startup time to precompute/cache - but query response time requirement on
large data set is very stringent

Hoping to use SparkSQL (but a combination of SQL and RDD APIs is also OK).

* Does SparkSQL execution uses underlying partition information ? (Data is
from HDFS)
* Are there any ways to give hints to the SparkSQL execution about any
precomputed/pre-cached RDDs?
* Packages spark.sql.execution, spark.sql.execution.joins and other sql.xxx
packages - would using these for tuning query plan is recommended? Would
like to keep this as-needed if possible
* Features not in current release but scheduled for upcoming release would
also be good to know.

Thanks,

PS: This is not a small topic so if someone prefers to start a offline
thread on details, I can do that and summarize the conclusions back to this
thread.


Re: spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType

2015-01-26 Thread Manoj Samel
Awesome ! That would be great !!

On Mon, Jan 26, 2015 at 3:18 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'm aiming for 1.3.

 On Mon, Jan 26, 2015 at 3:05 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Thanks Michael. I am sure there have been many requests for this support.

 Any release targeted for this?

 Thanks,

 On Sat, Jan 24, 2015 at 11:47 AM, Michael Armbrust 
 mich...@databricks.com wrote:

 Those annotations actually don't work because the timestamp is SQL has
 optional nano-second precision.

 However, there is a PR to add support using parquets INT96 type:
 https://github.com/apache/spark/pull/3820

 On Fri, Jan 23, 2015 at 12:08 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Looking further at the trace and ParquetTypes.scala, it seems there is
 no support for Timestamp and Date in fromPrimitiveDataType(ctype:
 DataType): Option[ParquetTypeInfo]. Since Parquet supports these type
 with some decoration over Int (
 https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md),
 any reason why Date / Timestamp are not supported right now ?

 Thanks,

 Manoj


 On Fri, Jan 23, 2015 at 11:40 AM, Manoj Samel manojsamelt...@gmail.com
  wrote:

 Using Spark 1.2

 Read a CSV file, apply schema to convert to SchemaRDD and then
 schemaRdd.saveAsParquetFile

 If the schema includes Timestamptype, it gives following trace when
 doing the save

 Exception in thread main java.lang.RuntimeException: Unsupported
 datatype TimestampType

 at scala.sys.package$.error(package.scala:27)

 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
 ParquetTypes.scala:343)

 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
 ParquetTypes.scala:292)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(
 ParquetTypes.scala:291)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
 ParquetTypes.scala:363)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
 ParquetTypes.scala:362)

 at scala.collection.TraversableLike$$anonfun$map$1.apply(
 TraversableLike.scala:244)

 at scala.collection.TraversableLike$$anonfun$map$1.apply(
 TraversableLike.scala:244)

 at scala.collection.mutable.ResizableArray$class.foreach(
 ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at scala.collection.TraversableLike$class.map(
 TraversableLike.scala:244)

 at scala.collection.AbstractTraversable.map(Traversable.scala:105)

 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(
 ParquetTypes.scala:361)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(
 ParquetTypes.scala:407)

 at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(
 ParquetRelation.scala:166)

 at org.apache.spark.sql.parquet.ParquetRelation$.create(
 ParquetRelation.scala:145)

 at
 org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(
 SparkStrategies.scala:204)

 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
 QueryPlanner.scala:58)

 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
 QueryPlanner.scala:58)

 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(
 QueryPlanner.scala:59)

 at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(
 SQLContext.scala:418)

 at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(
 SQLContext.scala:416)

 at
 org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(
 SQLContext.scala:422)

 at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(
 SQLContext.scala:422)

 at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(
 SQLContext.scala:425)

 at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(
 SQLContext.scala:425)

 at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(
 SchemaRDDLike.scala:76)

 at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(
 SchemaRDD.scala:108)

 at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88)

 at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54)

 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

 at scala.runtime.AbstractFunction0.apply$mcV$sp(
 AbstractFunction0.scala:12)

 at scala.App$$anonfun$main$1.apply(App.scala:71)

 at scala.App$$anonfun$main$1.apply(App.scala:71)

 at scala.collection.immutable.List.foreach(List.scala:318)

 at scala.collection.generic.TraversableForwarder$class.foreach(
 TraversableForwarder.scala:32)

 at scala.App$class.main(App.scala:71)

 at bdrt.MyTest$.main(MyTest.scala:10)









Re: spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType

2015-01-26 Thread Manoj Samel
Thanks Michael. I am sure there have been many requests for this support.

Any release targeted for this?

Thanks,

On Sat, Jan 24, 2015 at 11:47 AM, Michael Armbrust mich...@databricks.com
wrote:

 Those annotations actually don't work because the timestamp is SQL has
 optional nano-second precision.

 However, there is a PR to add support using parquets INT96 type:
 https://github.com/apache/spark/pull/3820

 On Fri, Jan 23, 2015 at 12:08 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Looking further at the trace and ParquetTypes.scala, it seems there is no
 support for Timestamp and Date in fromPrimitiveDataType(ctype: DataType):
 Option[ParquetTypeInfo]. Since Parquet supports these type with some
 decoration over Int (
 https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md),
 any reason why Date / Timestamp are not supported right now ?

 Thanks,

 Manoj


 On Fri, Jan 23, 2015 at 11:40 AM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Using Spark 1.2

 Read a CSV file, apply schema to convert to SchemaRDD and then
 schemaRdd.saveAsParquetFile

 If the schema includes Timestamptype, it gives following trace when
 doing the save

 Exception in thread main java.lang.RuntimeException: Unsupported
 datatype TimestampType

 at scala.sys.package$.error(package.scala:27)

 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
 ParquetTypes.scala:343)

 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
 ParquetTypes.scala:292)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(
 ParquetTypes.scala:291)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
 ParquetTypes.scala:363)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
 ParquetTypes.scala:362)

 at scala.collection.TraversableLike$$anonfun$map$1.apply(
 TraversableLike.scala:244)

 at scala.collection.TraversableLike$$anonfun$map$1.apply(
 TraversableLike.scala:244)

 at scala.collection.mutable.ResizableArray$class.foreach(
 ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

 at scala.collection.AbstractTraversable.map(Traversable.scala:105)

 at
 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(
 ParquetTypes.scala:361)

 at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(
 ParquetTypes.scala:407)

 at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(
 ParquetRelation.scala:166)

 at org.apache.spark.sql.parquet.ParquetRelation$.create(
 ParquetRelation.scala:145)

 at
 org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(
 SparkStrategies.scala:204)

 at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
 QueryPlanner.scala:58)

 at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
 QueryPlanner.scala:58)

 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(
 QueryPlanner.scala:59)

 at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(
 SQLContext.scala:418)

 at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(
 SQLContext.scala:416)

 at
 org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(
 SQLContext.scala:422)

 at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(
 SQLContext.scala:422)

 at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(
 SQLContext.scala:425)

 at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(
 SQLContext.scala:425)

 at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(
 SchemaRDDLike.scala:76)

 at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:108)

 at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88)

 at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54)

 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

 at scala.runtime.AbstractFunction0.apply$mcV$sp(
 AbstractFunction0.scala:12)

 at scala.App$$anonfun$main$1.apply(App.scala:71)

 at scala.App$$anonfun$main$1.apply(App.scala:71)

 at scala.collection.immutable.List.foreach(List.scala:318)

 at scala.collection.generic.TraversableForwarder$class.foreach(
 TraversableForwarder.scala:32)

 at scala.App$class.main(App.scala:71)

 at bdrt.MyTest$.main(MyTest.scala:10)







spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType

2015-01-23 Thread Manoj Samel
Using Spark 1.2

Read a CSV file, apply schema to convert to SchemaRDD and then
schemaRdd.saveAsParquetFile

If the schema includes Timestamptype, it gives following trace when doing
the save

Exception in thread main java.lang.RuntimeException: Unsupported datatype
TimestampType

at scala.sys.package$.error(package.scala:27)

at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
ParquetTypes.scala:343)

at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
ParquetTypes.scala:292)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(
ParquetTypes.scala:291)

at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
ParquetTypes.scala:363)

at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
ParquetTypes.scala:362)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(
ParquetTypes.scala:361)

at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(
ParquetTypes.scala:407)

at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(
ParquetRelation.scala:166)

at org.apache.spark.sql.parquet.ParquetRelation$.create(
ParquetRelation.scala:145)

at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(
SparkStrategies.scala:204)

at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
QueryPlanner.scala:58)

at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
QueryPlanner.scala:58)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(
QueryPlanner.scala:59)

at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(
SQLContext.scala:418)

at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(
SQLContext.scala:416)

at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(
SQLContext.scala:422)

at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(
SQLContext.scala:422)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(
SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425
)

at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(
SchemaRDDLike.scala:76)

at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:108)

at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88)

at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54)

at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.collection.immutable.List.foreach(List.scala:318)

at scala.collection.generic.TraversableForwarder$class.foreach(
TraversableForwarder.scala:32)

at scala.App$class.main(App.scala:71)

at bdrt.MyTest$.main(MyTest.scala:10)


Error when running SparkPi on Secure HA Hadoop cluster

2015-01-15 Thread Manoj Samel
Hi,

Setup is as follows

Hadoop Cluster 2.3.0 (CDH5.0)
- Namenode HA
- Resource manager HA
- Secured with Kerberos

Spark 1.2

Run SparkPi as follows
- conf/spark-defaults.conf has following entries
spark.yarn.queue myqueue
spark.yarn.access.namenodes hdfs://namespace (remember this is namenode HA)
- Do kinit with some user keytab
- submit SparkPi as follows
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client
--num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores
1 --queue thequeue $MY_SPARK_DIR/lib/spark-examples*.jar 10

Gives following trace (not sure why it shows unknown queue when queue name
is specified in the spark-defaults.conf above.

15/01/15 19:18:27 INFO impl.YarnClientImpl: Submitted application
application_1415648563285_31469
15/01/15 19:18:28 INFO yarn.Client: Application report for
application_1415648563285_31469 (state: FAILED)
15/01/15 19:18:28 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1415648563285_31469 submitted by user
XYZ to unknown queue: thequeue --- WHY UNKNOWN QUEUE ???
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: thequeue   --- WHY UNKNOWN QUEUE ???
 start time: 1421349507652
 final status: FAILED
 tracking URL: N/A
 user: XYZ
Exception in thread main org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)


Running spark 1.2 on Hadoop + Kerberos

2015-01-08 Thread Manoj Samel
Hi,

For running spark 1.2 on Hadoop cluster with Kerberos, what spark
configurations are required?

Using existing keytab, can any examples be submitted to the secured cluster
? How?

Thanks,


Re: Running spark 1.2 on Hadoop + Kerberos

2015-01-08 Thread Manoj Samel
Pl ignore the keytab question for now, the question wasn't fully described

Some old communication (Oct 14) says Spark is not certified with Kerberos.
Can someone comment on this aspect ?

On Thu, Jan 8, 2015 at 3:53 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Manoj,

 As long as you're logged in (i.e. you've run kinit), everything should
 just work. You can run klist to make sure you're logged in.

 On Thu, Jan 8, 2015 at 3:49 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:
  Hi,
 
  For running spark 1.2 on Hadoop cluster with Kerberos, what spark
  configurations are required?
 
  Using existing keytab, can any examples be submitted to the secured
 cluster
  ? How?
 
  Thanks,



 --
 Marcelo



Cannot see RDDs in Spark UI

2015-01-06 Thread Manoj Samel
Hi,

I create a bunch of RDDs, including schema RDDs. When I run the program and
go to UI on xxx:4040, the storage tab does not shows any RDDs.


Spark version is 1.1.1 (Hadoop 2.3)

Any thoughts?

Thanks,


Sharing sqlContext between Akka router and routee actors ...

2014-12-18 Thread Manoj Samel
Hi,

Akka router creates a sqlContext and creates a bunch of routees actors
 with sqlContext as parameter. The actors then execute query on that
sqlContext.

Would this pattern be a issue ? Any other way sparkContext etc. should be
shared cleanly in Akka routers/routees ?

Thanks,


Re: Spark Server - How to implement

2014-12-12 Thread Manoj Samel
Thanks Marcelo.

Spark Gurus/Databricks team - do you have something in roadmap for such a
spark server ?

Thanks,

On Thu, Dec 11, 2014 at 5:43 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Oops, sorry, fat fingers.

 We've been playing with something like that inside Hive:
 https://github.com/apache/hive/tree/spark/spark-client

 That seems to have at least a few of the characteristics you're
 looking for; but it's a very young project, and at this moment we're
 not developing it as a public API, but mostly for internal Hive use.
 It can give you a few ideas, though. Also, SPARK-3215.


 On Thu, Dec 11, 2014 at 5:41 PM, Marcelo Vanzin van...@cloudera.com
 wrote:
  Hi Manoj,
 
  I'm not aware of any public projects that do something like that,
  except for the Ooyala server which you say doesn't cover your needs.
 
  We've been playing with something like that inside Hive, though:
 
  On Thu, Dec 11, 2014 at 5:33 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:
  Hi,
 
  If spark based services are to be exposed as a continuously available
  server, what are the options?
 
  * The API exposed to client will be proprietary and fine grained (RPC
 style
  ..), not a Job level API
  * The client API need not be SQL so the Thrift JDBC server does not
 seem to
  be option .. but I could be wrong here ...
  * Ooyala implementation is a REST API for job submission, but as
 mentioned
  above; the desired API is a finer grain API, not a job submission
 
  Any existing implementation?
 
  Is it build your own server? Any thoughts on approach to use ?
 
  Thanks,
 
 
 
 
 
 
 
  --
  Marcelo



 --
 Marcelo



Spark Server - How to implement

2014-12-11 Thread Manoj Samel
Hi,

If spark based services are to be exposed as a continuously available
server, what are the options?

* The API exposed to client will be proprietary and fine grained (RPC style
..), not a Job level API
* The client API need not be SQL so the Thrift JDBC server does not seem to
be option .. but I could be wrong here ...
* Ooyala implementation is a REST API for job submission, but as mentioned
above; the desired API is a finer grain API, not a job submission

Any existing implementation?

Is it build your own server? Any thoughts on approach to use ?

Thanks,


Spark 1.1.1 SQLContext.jsonFile dumps trace if JSON has newlines ...

2014-12-10 Thread Manoj Samel
I am using SQLContext.jsonFile. If a valid JSON contains newlines,
spark1.1.1 dumps trace below. If the JSON is read as one line, it works
fine. Is this known?


14/12/10 11:44:02 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
28)

com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input
within/between OBJECT entries

 at [Source: java.io.StringReader@4c8dd4d9; line: 1, column: 19]

at com.fasterxml.jackson.core.JsonParser._constructError(
JsonParser.java:1524)

at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWS(
ReaderBasedJsonParser.java:1682)

at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(
ReaderBasedJsonParser.java:619)

at
com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringMap(
MapDeserializer.java:412)

at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(
MapDeserializer.java:312)

at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(
MapDeserializer.java:26)

at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(
ObjectMapper.java:2986)

at com.fasterxml.jackson.databind.ObjectMapper.readValue(
ObjectMapper.java:2091)

at
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(
JsonRDD.scala:275)

at
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(
JsonRDD.scala:274)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at scala.collection.TraversableOnce$class.reduceLeft(
TraversableOnce.scala:172)

at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)

at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847)

at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845)

at org.apache.spark.SparkContext$$anonfun$28.apply(SparkContext.scala:1179)

at org.apache.spark.SparkContext$$anonfun$28.apply(SparkContext.scala:1179)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

14/12/10 11:44:02 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 28,
localhost): com.fasterxml.jackson.core.JsonParseException: Unexpected
end-of-input within/between OBJECT entries

 at [Source: java.io.StringReader@4c8dd4d9; line: 1, column: 19]

com.fasterxml.jackson.core.JsonParser._constructError(
JsonParser.java:1524)

com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWS(
ReaderBasedJsonParser.java:1682)

com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(
ReaderBasedJsonParser.java:619)


com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringMap(
MapDeserializer.java:412)


com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(
MapDeserializer.java:312)


com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(
MapDeserializer.java:26)

com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(
ObjectMapper.java:2986)

com.fasterxml.jackson.databind.ObjectMapper.readValue(
ObjectMapper.java:2091)


org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(
JsonRDD.scala:275)


org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(
JsonRDD.scala:274)

scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

scala.collection.Iterator$class.foreach(Iterator.scala:727)

scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

scala.collection.TraversableOnce$class.reduceLeft(
TraversableOnce.scala:172)

scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)

org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847)

org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845)

org.apache.spark.SparkContext$$anonfun$28.apply(
SparkContext.scala:1179)

org.apache.spark.SparkContext$$anonfun$28.apply(
SparkContext.scala:1179)

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178
)

java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:744)

14/12/10 11:44:02 ERROR TaskSetManager: Task 0 in stage 14.0 failed 1
times; aborting job


Can HiveContext be used without using Hive?

2014-12-09 Thread Manoj Samel
From 1.1.1 documentation, it seems one can use HiveContext instead of
SQLContext without having a Hive installation. The benefit is richer SQL
dialect.

Is my understanding correct ?

Thanks


Spark SQL - Any time line to move beyond Alpha version ?

2014-11-24 Thread Manoj Samel
Is there any timeline where Spark SQL goes beyond alpha version?

Thanks,


Re: Spark resilience

2014-04-15 Thread Manoj Samel
Thanks Aaron, this is useful !

- Manoj


On Mon, Apr 14, 2014 at 8:12 PM, Aaron Davidson ilike...@gmail.com wrote:

 Launching drivers inside the cluster was a feature added in 0.9, for
 standalone cluster mode:
 http://spark.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster

 Note the supervise flag, which will cause the driver to be restarted if
 it fails. This is a rather low-level mechanism which by default will just
 cause the whole job to rerun from the beginning. Special recovery would
 have to be implemented by hand, via some sort of state checkpointing into a
 globally visible storage system (e.g., HDFS), which, for example, Spark
 Streaming already does.

 Currently, this feature is not supported in YARN or Mesos fine-grained
 mode.


 On Mon, Apr 14, 2014 at 2:08 PM, Manoj Samel manojsamelt...@gmail.comwrote:

 Could you please elaborate how drivers can be restarted automatically ?

 Thanks,


 On Mon, Apr 14, 2014 at 10:30 AM, Aaron Davidson ilike...@gmail.comwrote:

 Master and slave are somewhat overloaded terms in the Spark ecosystem
 (see the glossary:
 http://spark.apache.org/docs/latest/cluster-overview.html#glossary).
 Are you actually asking about the Spark driver and executors, or the
 standalone cluster master and workers?

 To briefly answer for either possibility:
 (1) Drivers are not fault tolerant but can be restarted automatically,
 Executors may be removed at any point without failing the job (though
 losing an Executor may slow the job significantly), and Executors may be
 added at any point and will be immediately used.
 (2) Standalone cluster Masters are fault tolerant and failure will only
 temporarily stall new jobs from starting or getting new resources, but does
 not affect currently-running jobs. Workers can fail and will simply cause
 jobs to lose their current Executors. New Workers can be added at any point.



 On Mon, Apr 14, 2014 at 11:00 AM, Ian Ferreira 
 ianferre...@hotmail.comwrote:

 Folks,

 I was wondering what the failure support modes where for Spark while
 running jobs


1. What happens when a master fails
2. What happens when a slave fails
3. Can you mid job add and remove slaves


 Regarding the install on Meso, if I understand correctly the Spark
 master is behind a Zookeeper quorum so that isolates the slaves from a
 master failure, but what about the masters behind quorum?

 Cheers
 - Ian







Re: Spark resilience

2014-04-14 Thread Manoj Samel
Could you please elaborate how drivers can be restarted automatically ?

Thanks,


On Mon, Apr 14, 2014 at 10:30 AM, Aaron Davidson ilike...@gmail.com wrote:

 Master and slave are somewhat overloaded terms in the Spark ecosystem (see
 the glossary:
 http://spark.apache.org/docs/latest/cluster-overview.html#glossary). Are
 you actually asking about the Spark driver and executors, or the
 standalone cluster master and workers?

 To briefly answer for either possibility:
 (1) Drivers are not fault tolerant but can be restarted automatically,
 Executors may be removed at any point without failing the job (though
 losing an Executor may slow the job significantly), and Executors may be
 added at any point and will be immediately used.
 (2) Standalone cluster Masters are fault tolerant and failure will only
 temporarily stall new jobs from starting or getting new resources, but does
 not affect currently-running jobs. Workers can fail and will simply cause
 jobs to lose their current Executors. New Workers can be added at any point.



 On Mon, Apr 14, 2014 at 11:00 AM, Ian Ferreira ianferre...@hotmail.comwrote:

 Folks,

 I was wondering what the failure support modes where for Spark while
 running jobs


1. What happens when a master fails
2. What happens when a slave fails
3. Can you mid job add and remove slaves


 Regarding the install on Meso, if I understand correctly the Spark master
 is behind a Zookeeper quorum so that isolates the slaves from a master
 failure, but what about the masters behind quorum?

 Cheers
 - Ian





Re: Error in SparkSQL Example

2014-03-31 Thread Manoj Samel
Hi Michael,

Thanks for the clarification. My question is about the error above error:
class $iwC needs to be abstract and what does the RDD brings, since I can
do the DSL without the people: people: org.apache.spark.rdd.RDD[Person]

Thanks,


On Mon, Mar 31, 2014 at 9:13 AM, Michael Armbrust mich...@databricks.comwrote:

 val people: RDD[Person] // An RDD of case class objects, from the first
 example. is just a placeholder to avoid cluttering up each example with
 the same code for creating an RDD.  The : RDD[People] is just there to
 let you know the expected type of the variable 'people'.  Perhaps there is
 a clearer way to indicate this.

 As you have realized, using the full line from the first example will
 allow you to run the rest of them.



 On Sun, Mar 30, 2014 at 7:31 AM, Manoj Samel manojsamelt...@gmail.comwrote:

 Hi,

 On
 http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html,
 I am trying to run code on Writing Language-Integrated Relational Queries
 ( I have 1.0.0 Snapshot ).

 I am running into error on

 val people: RDD[Person] // An RDD of case class objects, from the first
 example.

 scala val people: RDD[Person]
 console:19: error: not found: type RDD
val people: RDD[Person]
^

 scala val people: org.apache.spark.rdd.RDD[Person]
 console:18: error: class $iwC needs to be abstract, since value people
 is not defined
 class $iwC extends Serializable {
   ^

 Any idea what the issue is ?

 Also, its not clear what does the RDD[Person] brings. I can run the DSL
 without the case class objects RDD ...

 val people =
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = Person(p(0), p(1).trim.toInt))

 val teenagers = people.where('age = 13).where('age = 19)

 Thanks,







Error in SparkSQL Example

2014-03-30 Thread Manoj Samel
Hi,

On
http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html,
I am trying to run code on Writing Language-Integrated Relational Queries
( I have 1.0.0 Snapshot ).

I am running into error on

val people: RDD[Person] // An RDD of case class objects, from the first
example.

scala val people: RDD[Person]
console:19: error: not found: type RDD
   val people: RDD[Person]
   ^

scala val people: org.apache.spark.rdd.RDD[Person]
console:18: error: class $iwC needs to be abstract, since value people is
not defined
class $iwC extends Serializable {
  ^

Any idea what the issue is ?

Also, its not clear what does the RDD[Person] brings. I can run the DSL
without the case class objects RDD ...

val people =
sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
= Person(p(0), p(1).trim.toInt))

val teenagers = people.where('age = 13).where('age = 19)

Thanks,


Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?

2014-03-30 Thread Manoj Samel
Hi,

I am trying SparkSQL based on the example on doc ...



val people =
sc.textFile(/data/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
= Person(p(0), p(1).trim.toInt))


val olderThanTeans = people.where('age  19)
val youngerThanTeans = people.where('age  13)
val nonTeans = youngerThanTeans.union(olderThanTeans)

I can do a orderBy('age) on first two (which are SchemaRDD) but not on
third. The nonTeans is a UnionRDD that does not supports orderBy. This
seems different than the SQL behavior where results of 2 SQL unions is a
SQL itself with same functionality ...

Not clear why union of 2 SchemaRDDs does not produces a SchemaRDD 


Thanks,


SparkSQL where with BigDecimal type gives stacktrace

2014-03-30 Thread Manoj Samel
Hi,

If I do a where on BigDecimal, I get a stack trace. Changing BigDecimal to
Double works ...

scala case class JournalLine(account: String, credit: BigDecimal, debit:
BigDecimal, date: String, company: String, currency: String, costcenter:
String, region: String)
defined class JournalLine
...
scala jl.where('credit  0).foreach(println)
scala.MatchError: scala.BigDecimal (of class
scala.reflect.internal.Types$TypeRef$$anon$3)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:41)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:45)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:38)
at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:32)
at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:128)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:79)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:39)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44)
at $iwC$$iwC$$iwC$$iwC.init(console:46)
at $iwC$$iwC$$iwC.init(console:48)
at $iwC$$iwC.init(console:50)
at $iwC.init(console:52)
at init(console:54)
at .init(console:58)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:777)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1045)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:883)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:981)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)

Thanks,


Re: SparkSQL where with BigDecimal type gives stacktrace

2014-03-30 Thread Manoj Samel
Hi,

Would the same issue be present for other Java type like Date ?

Converting the person/teenager example on Patricks page reproduces the
problem ...

Thanks,


scala import scala.math
import scala.math

scala case class Person(name: String, age: BigDecimal)
defined class Person

scala val people =
sc.textFile(/data/spark/examples/src/main/resources/people.txt).map(_.split(,)).map(p
= Person(p(0), BigDecimal(p(1).trim.toInt)))
14/03/31 00:23:40 INFO MemoryStore: ensureFreeSpace(32960) called with
curMem=0, maxMem=308713881
14/03/31 00:23:40 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 32.2 KB, free 294.4 MB)
people: org.apache.spark.rdd.RDD[Person] = MappedRDD[3] at map at
console:20

scala people take 1
...

scala val t = people.where('age  12 )
scala.MatchError: scala.BigDecimal (of class
scala.reflect.internal.Types$TypeRef$$anon$3)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:41)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:45)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:45)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:38)
at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:32)
at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:128)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:79)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:27)
at $iwC$$iwC$$iwC$$iwC.init(console:29)
at $iwC$$iwC$$iwC.init(console:31)
at $iwC$$iwC.init(console:33)
at $iwC.init(console:35)
at init(console:37)
at .init(console:41)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:777)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1045)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:883)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:883)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:981)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)



On Sun, Mar 30, 2014 at 11:04 AM, Aaron Davidson ilike...@gmail.com wrote:

 Well, the error is coming from this case statement not matching on the
 BigDecimal type:
 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L41

 This seems to be a bug because there is a corresponding Catalyst DataType
 for BigDecimal, just no way to produce a schema for it. A patch should be
 straightforward enough to match against typeOf[BigDecimal] assuming this
 was not for some reason intentional.


 On Sun, Mar 30, 2014 at 10:43 AM, smallmonkey...@hotmail.com 
 smallmonkey...@hotmail.com wrote:

  can I get the whole operation? then i can try to locate  the error

 --
  smallmonkey...@hotmail.com

  *From:* Manoj Samel manojsamelt...@gmail.com
 *Date:* 2014-03-31 01:16
 *To:* user user@spark.apache.org
 *Subject:* SparkSQL where with BigDecimal type gives stacktrace
  Hi,

 If I do a where on BigDecimal, I get a stack trace. Changing

groupBy RDD does not have grouping column ?

2014-03-30 Thread Manoj Samel
Hi,

If I create a groupBy('a)(Sum('b) as 'foo, Sum('c) as 'bar), then the
resulting RDD should have 'a, 'foo and 'bar.

The result RDD just shows 'foo and 'bar and is missing 'a

Thoughts?

Thanks,

Manoj