How to add jar with SparkSQL HiveContext?
I have a problem with add jar command hql(add jar /.../xxx.jar) Error: Exception in thread main java.lang.AssertionError: assertion failed: No plan for AddJar ... How could I do this job with HiveContext, I can't find any api to do it. Does SparkSQL with Hive support UDF/UDAF? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jar-with-SparkSQL-HiveContext-tp7713.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to add jar with SparkSQL HiveContext?
Can you try this in master? You are likely running into SPARK-2128 https://issues.apache.org/jira/browse/SPARK-2128. Michael On Mon, Jun 16, 2014 at 11:41 PM, Earthson earthson...@gmail.com wrote: I have a problem with add jar command hql(add jar /.../xxx.jar) Error: Exception in thread main java.lang.AssertionError: assertion failed: No plan for AddJar ... How could I do this job with HiveContext, I can't find any api to do it. Does SparkSQL with Hive support UDF/UDAF? -- View this message in context: How to add jar with SparkSQL HiveContext? http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jar-with-SparkSQL-HiveContext-tp7713.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Contribution to Spark MLLib
Hello, I wish to contribute some algorithms to the MLLib of Spark but at the same time wanted to make sure that I don't try something redundant. Will it be okay with you to let me know the set of algorithms which aren't there in your road map in the near future ? Also, can I use Java to write machine learning algorithms for Spark MLLib instead of Scala ? Regards, Jayati -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 0.9.1 core dumps on Mesos 0.18.0
hi, steven, have you resolved this problem? i encounter the same problem, too. 2014-04-18 3:48 GMT+08:00 Sean Owen so...@cloudera.com: Oh dear I read this as a build problem. I can build with the latest Java 7, including those versions of Spark and Mesos, no problem. I did not deploy them. Mesos does have some native libraries, so it might well be some kind of compatibility issue at that level. Anything more in the error log that would demonstrate it came from Mesos vs the JDK? (I likely don't have anything useful to add here though) On Thu, Apr 17, 2014 at 8:21 PM, andy petrella andy.petre...@gmail.com wrote: No of course, but I was guessing some native libs imported (to communicate with Mesos) in the project that... could miserably crash the JVM. Anyway, so you tell us that using this oracle version, you don't have any issues when using spark on mesos 0.18.0, that's interesting 'cause AFAIR, my last test (done by night, which means floating and eventual memory) I was using this particular version as well. Just to make thing clear, Sean, you're using spark 0.9.1 on Mesos 0.18.0 with Hadoop 2.x (x = 2) without any modification than just specifying against which version of hadoop you had run make-distribution?
Re: Spark 0.9.1 core dumps on Mesos 0.18.0
i am using spark 0.9.1 , mesos 0.19.0 and tachyon 0.4.1 , is spark0.9.1 compatiable with mesos0.19.0? 2014-06-17 15:50 GMT+08:00 qingyang li liqingyang1...@gmail.com: hi, steven, have you resolved this problem? i encounter the same problem, too. 2014-04-18 3:48 GMT+08:00 Sean Owen so...@cloudera.com: Oh dear I read this as a build problem. I can build with the latest Java 7, including those versions of Spark and Mesos, no problem. I did not deploy them. Mesos does have some native libraries, so it might well be some kind of compatibility issue at that level. Anything more in the error log that would demonstrate it came from Mesos vs the JDK? (I likely don't have anything useful to add here though) On Thu, Apr 17, 2014 at 8:21 PM, andy petrella andy.petre...@gmail.com wrote: No of course, but I was guessing some native libs imported (to communicate with Mesos) in the project that... could miserably crash the JVM. Anyway, so you tell us that using this oracle version, you don't have any issues when using spark on mesos 0.18.0, that's interesting 'cause AFAIR, my last test (done by night, which means floating and eventual memory) I was using this particular version as well. Just to make thing clear, Sean, you're using spark 0.9.1 on Mesos 0.18.0 with Hadoop 2.x (x = 2) without any modification than just specifying against which version of hadoop you had run make-distribution?
Re: Can't get Master Kerberos principal for use as renewer
Update. I've reconfigured the environment to use Spark 1.0.0 and the example finally worked! :) The different for me was that Spark 1.0.0 requires only to specify the hadoop conf dir (HADOOP_CONF_DIR=/etc/hadoop/conf/) I guess that with 0.9 there were problems in spotting this dir...but I'm not sure why. On 16 June 2014 23:03, Finamore A. alessandro.finam...@polito.it wrote: Hi, I'm a new user to Spark and I'm trying to integrate it in my cluster. It's a small set of nodes running CDH 4.7 with kerberos. The other services are fine with the authentication but I've some troubles with spark. First, I used the parcel available in cloudera manager (SPARK 0.9.0-1.cdh4.6.0.p0.98) Since the cluster has CDH4.7 (not 4.6) I'm not sure if this can create problems. I've also tried with the new spark 1.0.0 with no luck ... I've configured the environment as reported in http://www.cloudera.com/content/cloudera-content/cloudera-docs/CM4Ent/4.8.1/Cloudera-Manager-Installation-Guide/cmig_spark_installation_standalone.html I'm using a standalone deployment. When launching spark-shell (for testing), everything seems fine (the process got registered with master) But when I try to execute the example reported in the installation page, Kerberos blocks the access to HDFS scala val file = sc.textFile(hdfs:// m1hadoop.polito.it:8020/user/finamore/data) 14/06/16 22:28:36 INFO storage.MemoryStore: ensureFreeSpace(135653) called with curMem=0, maxMem=308713881 14/06/16 22:28:36 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 132.5 KB, free 294.3 MB) file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala val counts = file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey(_ + _) java.io.IOException: Can't get Master Kerberos principal for use as renewer at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:187) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:251) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58) at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:354) at $iwC$$iwC$$iwC$$iwC.init(console:14) at $iwC$$iwC$$iwC.init(console:19) at $iwC$$iwC.init(console:21) at $iwC.init(console:23) at init(console:25) at .init(console:29) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745) at
Re: pyspark regression results way off
Thanks, will try normalising it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-regression-results-way-off-tp7672p7720.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming, kafka, SPARK_CLASSPATH
Admittedly getting Spark Streaming / Kafka working for the first time can be a bit tricky with the web of dependencies that get pulled in. I've taken the KafkaWorkCount example from the Spark project and set up a simple standalone SBT project that shows you how to get it working and using spark-submit. *https://github.com/cotdp/spark-example-kafka https://github.com/cotdp/spark-example-kafka* The key trick is in the use of sbt-assembly instead of relying on any of the add jars functionality. You mark spark-core and spark-streaming as provided, because they are part of the core spark-assembly already running your cluster. However spark-streaming-kafka is not, so you need to package it in your 'fat JAR' while excluding all the mess that causes the build to break. build.sbt https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt: import AssemblyKeys._ assemblySettings name := spark-example-kafka version := 1.0 scalaVersion := 2.10.4 jarName in assembly := spark-example-kafka_2.10-1.0.jar assemblyOption in assembly ~= { _.copy(includeScala = false) } libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-kafka % 1.0.0). exclude(commons-beanutils, commons-beanutils). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog) ) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case x if x.startsWith(META-INF/ECLIPSEF.RSA) = MergeStrategy.last case x if x.startsWith(META-INF/mailcap) = MergeStrategy.last case x if x.startsWith(plugin.properties) = MergeStrategy.last case x = old(x) } } You can see the exclude() has to go around the spark-streaming-kafka dependency, and I've used a MergeStrategy to solve the deduplicate: different file contents found in the following errors. Build the JAR with sbt assembly and use the scripts in bin/ to run the examples. I'm using this same approach to run my Spark Streaming jobs with spark-submit and have them managed using Mesos/Marathon http://mesosphere.io/ to handle failures and restarts with long running processes. Good luck! MC *Michael Cutler* Founder, CTO *Mobile: +44 789 990 7847Email: mich...@tumra.com mich...@tumra.comWeb: tumra.com http://tumra.com/?utm_source=signatureutm_medium=email* *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq* *Registered in England Wales, 07916412. VAT No. 130595328* This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 17 June 2014 02:51, Gino Bustelo lbust...@gmail.com wrote: +1 for this issue. Documentation for spark-submit are misleading. Among many issues, the jar support is bad. HTTP urls do not work. This is because spark is using hadoop's FileSystem class. You have to specify the jars twice to get things to work. Once for the DriverWrapper to laid your classes and a 2nd time in the Context to distribute to workers. I would like to see some contrib response to this issue. Gino B. On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: Did you manage to make it work? I'm facing similar problems and this a serious blocker issue. spark-submit seems kind of broken to me if you can use it for spark-streaming. Regards, Luis 2014-06-11 1:48 GMT+01:00 lannyripple lanny.rip...@gmail.com: I am using Spark 1.0.0 compiled with Hadoop 1.2.1. I have a toy spark-streaming-kafka program. It reads from a kafka queue and does stream .map {case (k, v) = (v, 1)} .reduceByKey(_ + _) .print() using a 1 second interval on the stream. The docs say to make Spark and Hadoop jars 'provided' but this breaks for spark-streaming. Including spark-streaming (and spark-streaming-kafka) as 'compile' to sweep them into our assembly gives collisions on javax.* classes. To work around this I modified $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming, spark-streaming-kafka, and zkclient. (Note that kafka is included as 'compile' in my project and picked up in the assembly.) I have set up conf/spark-env.sh as needed. I have copied my assembly to /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory. I am running spark-submit from my spark master. I am guided by the information here https://spark.apache.org/docs/latest/submitting-applications.html Well at this point I was going to detail all the ways spark-submit fails to follow it's own documentation. If I do not invoke sparkContext.setJars() then it just fails to find the driver class. This is using various
Yarn-client mode and standalone-client mode hang during job start
Hi, I've stuck using either yarn-client or standalone-client mode. Either will stuck when I submit jobs, the last messages it printed were: ... 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/rtgraph.jar at http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@6655cf60 I can use yarn-cluster to run my app but it's not very convenient to monitor the progress. Standalone-cluster mode doesn't work, it reports file not found error: Driver successfully submitted as driver-20140617023956-0003 ... waiting before polling master for driver state ... polling master for driver state State of driver-20140617023956-0003 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/x/home/jianshuang/tmp/rtgraph.jar does not exist I'm using Spark 1.0.0 and my submit command looks like this: ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph' --class com.paypal.rtgraph.demo.MapReduceWriter --master spark:// lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','` --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster rtgraph.jar List of jars I put in --jars option are: accumulo-core.jar accumulo-fate.jar accumulo-minicluster.jar accumulo-trace.jar accumulo-tracer.jar chill_2.10-0.3.6.jar commons-math.jar commons-vfs2.jar config-1.2.1.jar gson.jar guava.jar joda-convert-1.2.jar joda-time-2.3.jar kryo-2.21.jar libthrift.jar quasiquotes_2.10-2.0.0-M8.jar scala-async_2.10-0.9.1.jar scala-library-2.10.4.jar scala-reflect-2.10.4.jar Anyone has hint what went wrong? Really confused. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark sql unable to connect to db2 hive metastore
First a clarification: Spark SQL does not talk to HiveServer2, as that JDBC interface is for retrieving results from queries that are executed using Hive. Instead Spark SQL will execute queries itself by directly accessing your data using Spark. Spark SQL's Hive module can use JDBC to connect to an external metastore, in your case DB2. This is only used to retrieve the metadata (i.e., column names and types, HDFS locations for data) Looking at your exception I still see java.sql.SQLException: No suitable driver, so my guess would be that the DB2 JDBC drivers are not being correctly included. How are you trying to add them to the classpath? Michael On Tue, Jun 17, 2014 at 1:29 AM, Jenny Zhao linlin200...@gmail.com wrote: Hi, my hive configuration use db2 as it's metastore database, I have built spark with the extra step sbt/sbt assembly/assembly to include the dependency jars. and copied HIVE_HOME/conf/hive-site.xml under spark/conf. when I ran : hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) got following exception, pasted portion of the stack trace here, looking at the stack, this made me wondering if Spark supports remote metastore configuration, it seems spark doesn't talk to hiveserver2 directly? the driver jars: db2jcc-10.5.jar, db2jcc_license_cisuz-10.5.jar both are included in the classpath, otherwise, it will complain it couldn't find the driver. Appreciate any help to resolve it. Thanks! Caused by: java.sql.SQLException: Unable to open a test connection to the given database. JDBC url = jdbc:db2://localhost:50001/BIDB, username = catalog. Terminating connection pool. Original Exception: -- java.sql.SQLException: No suitable driver at java.sql.DriverManager.getConnection(DriverManager.java:422) at java.sql.DriverManager.getConnection(DriverManager.java:374) at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:254) at com.jolbox.bonecp.BoneCP.init(BoneCP.java:305) at com.jolbox.bonecp.BoneCPDataSource.maybeInit(BoneCPDataSource.java:150) at com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:112) at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:479) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:304) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:56) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:39) at java.lang.reflect.Constructor.newInstance(Constructor.java:527) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301) at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1069) at org.datanucleus.NucleusContext.initialise(NucleusContext.java:359) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:768) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:326) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:195) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37) at java.lang.reflect.Method.invoke(Method.java:611) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(AccessController.java:277) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at
Re: Yarn-client mode and standalone-client mode hang during job start
For standalone-cluster mode, there's a scala.MatchError. Also it looks like the --jars configurations are not passed to the driver/worker node? (also copying from file:/path doesn't seem correct, shouldn't it copy form http://master/path ?) ... 14/06/17 04:15:30 INFO Worker: Asked to launch driver driver-20140617041530- 14/06/17 04:15:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/17 04:15:30 INFO DriverRunner: Copying user jar file:/x/home/jianshuang/tmp/rtgraph.jar to /x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/work/driver-20140617041530-/rtgraph.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/06/17 04:15:30 INFO DriverRunner: Launch Command: /usr/java/jdk1.7.0_40/bin/java -cp /x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/work/driver-20140617041530-/rtgraph.jar:::/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/conf:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/spark-assembly-1.0.0-hadoop2.4.0.jar:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/datanucleus-api-jdo-3.2.1.jar:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/datanucleus-core-3.2.2.jar:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/datanucleus-rdbms-3.2.1.jar:/etc/hadoop/conf:/usr/lib/hadoop-yarn/conf -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.deploy.worker.DriverWrapper akka.tcp:// sparkwor...@lvshdc5dn0321.lvs.paypal.com:41987/user/Worker com.paypal.rtgraph.demo.MapReduceWriter 14/06/17 04:15:32 ERROR OneForOneStrategy: FAILED (of class scala.Enumeration$Val) scala.MatchError: FAILED (of class scala.Enumeration$Val) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/06/17 04:15:32 INFO Worker: Starting Spark worker lvshdc5dn0321.lvs.paypal.com:41987 with 32 cores, 125.0 GB RAM 14/06/17 04:15:32 INFO Worker: Spark home: /x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0 14/06/17 04:15:32 INFO WorkerWebUI: Started WorkerWebUI at http://lvshdc5dn0321.lvs.paypal.com:8081 14/06/17 04:15:32 INFO Worker: Connecting to master spark://lvshdc5en0015.lvs.paypal.com:7077... 14/06/17 04:15:32 ERROR Worker: Worker registration failed: Attempted to re-register worker at same address: akka.tcp:// sparkwor...@lvshdc5dn0321.lvs.paypal.com:41987 Is that a bug? Jianshi On Tue, Jun 17, 2014 at 5:41 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I've stuck using either yarn-client or standalone-client mode. Either will stuck when I submit jobs, the last messages it printed were: ... 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/rtgraph.jar at http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@6655cf60 I can use yarn-cluster to run my app but it's not very convenient to monitor the progress. Standalone-cluster mode doesn't work, it reports file not found error: Driver successfully submitted as driver-20140617023956-0003 ... waiting before polling master for driver state ... polling master for driver state State of driver-20140617023956-0003 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/x/home/jianshuang/tmp/rtgraph.jar does not exist I'm using Spark 1.0.0 and my submit command looks like this: ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph' --class com.paypal.rtgraph.demo.MapReduceWriter --master spark:// lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','` --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster rtgraph.jar List of jars I put in --jars option are: accumulo-core.jar accumulo-fate.jar accumulo-minicluster.jar accumulo-trace.jar accumulo-tracer.jar chill_2.10-0.3.6.jar commons-math.jar commons-vfs2.jar config-1.2.1.jar gson.jar guava.jar
news20-binary classification with LogisticRegressionWithSGD
Hello, I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though the number of training examples used in the evaluation is just 1,000. It works fine for the dataset *news20.binary.1000* that has 178,560 features. However, it does not work for *news20.random.1000* where # of features is large (1,354,731 features) though we used a sparse vector through MLUtils.loadLibSVMFile(). The execution seems not progressing while no error is reported in the spark-shell as well as in the stdout/stderr of executors. We used 32 executors with each allocating 7GB (2GB is for RDD) for working memory. Any suggesions? Your help is really appreciated. == Executed code == import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionWithSGD //val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.binary.1000, multiclass=false) val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = training .take(1)(0).features.size //numFeatures: Int = 178560 for news20.binary.1000 //numFeatures: Int = 1354731 for news20.random.1000 val model = LogisticRegressionWithSGD.train(training, numIterations=1) == The dataset used in the evaluation == http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' news20.binary.1000 $ sort -R news20.binary news20.random $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' news20.random.1000 You can find the dataset in https://dl.dropboxusercontent.com/u/13123103/news20.random.1000 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000 Thanks, Makoto
Re: wholeTextFiles not working with HDFS
I didn't fix the issue so much as work around it. I was running my cluster locally, so using HDFS was just a preference. The code worked with the local file system, so that's what I'm using until I can get some help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7726.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
join operation is taking too much time
Hi all, I want to do a recursive leftOuterJoin between an RDD (created from file) with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 30 diff files in each iteration of a loop) varying from 1 to 6 million rows. When I run it for 5 RDDs,its running successfully in 5 minutes.But when I increase it to 10 or 30 RDDs its gradually slowing down and finally getting stuck without showing any warning or error. I am running in standalone mode with 2 workers of 4GB each and a total of 16 cores . Any of you facing similar problems with JOIN or is it a problem with my configuration. Thanks Regards, Meethu M
Re: wholeTextFiles not working with HDFS
Hi Sguj and littlebird, I'll try to fix it tomorrow evening and the day after tomorrow, because I am now busy preparing a talk (slides) tomorrow. Sorry for the inconvenience to you. Would you mind to write an issue on Spark JIRA? 2014-06-17 20:55 GMT+08:00 Sguj tpcome...@yahoo.com: I didn't fix the issue so much as work around it. I was running my cluster locally, so using HDFS was just a preference. The code worked with the local file system, so that's what I'm using until I can get some help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Best Regards --- Xusen Yin(尹绪森) Intel Labs China Homepage: *http://yinxusen.github.io/ http://yinxusen.github.io/*
Execution stalls in LogisticRegressionWithSGD
Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 16 ms 19 619 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 18 ms Executor stats: RDD BlocksMemory UsedDisk UsedActive TasksFailed Tasks Complete TasksTotal TasksTask Time
Re: spark streaming, kafka, SPARK_CLASSPATH
After playing a bit, I have been able to create a fatjar this way: lazy val rootDependencies = Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, org.apache.spark %% spark-streaming-twitter % 1.0.0 exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) ) Excluding those transitive dependencies, we can create a fatjar ~400Kb instead of 40Mb. My problem is not to run the streaming job locally but trying to submit it to standalone cluster using spark-submit, everytime I ran the following command, my workers died: ~/development/tools/spark/1.0.0/bin/spark-submit \ --class org.apache.spark.examples.streaming.TwitterPopularTags \ --master spark://int-spark-master:7077 \ --deploy-mode cluster \ file:///tmp/spark-test-0.1-SNAPSHOT.jar I have copied my fatjar to my master /tmp folder. 2014-06-17 10:30 GMT+01:00 Michael Cutler mich...@tumra.com: Admittedly getting Spark Streaming / Kafka working for the first time can be a bit tricky with the web of dependencies that get pulled in. I've taken the KafkaWorkCount example from the Spark project and set up a simple standalone SBT project that shows you how to get it working and using spark-submit. *https://github.com/cotdp/spark-example-kafka https://github.com/cotdp/spark-example-kafka* The key trick is in the use of sbt-assembly instead of relying on any of the add jars functionality. You mark spark-core and spark-streaming as provided, because they are part of the core spark-assembly already running your cluster. However spark-streaming-kafka is not, so you need to package it in your 'fat JAR' while excluding all the mess that causes the build to break. build.sbt https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt: import AssemblyKeys._ assemblySettings name := spark-example-kafka version := 1.0 scalaVersion := 2.10.4 jarName in assembly := spark-example-kafka_2.10-1.0.jar assemblyOption in assembly ~= { _.copy(includeScala = false) } libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-kafka % 1.0.0). exclude(commons-beanutils, commons-beanutils). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog) ) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case x if x.startsWith(META-INF/ECLIPSEF.RSA) = MergeStrategy.last case x if x.startsWith(META-INF/mailcap) = MergeStrategy.last case x if x.startsWith(plugin.properties) = MergeStrategy.last case x = old(x) } } You can see the exclude() has to go around the spark-streaming-kafka dependency, and I've used a MergeStrategy to solve the deduplicate: different file contents found in the following errors. Build the JAR with sbt assembly and use the scripts in bin/ to run the examples. I'm using this same approach to run my Spark Streaming jobs with spark-submit and have them managed using Mesos/Marathon http://mesosphere.io/ to handle failures and restarts with long running processes. Good luck! MC *Michael Cutler* Founder, CTO * Mobile: +44 789 990 7847 Email: mich...@tumra.com mich...@tumra.com Web: tumra.com http://tumra.com/?utm_source=signatureutm_medium=email * *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq* *Registered in England Wales, 07916412. VAT No. 130595328* This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 17 June 2014 02:51, Gino Bustelo lbust...@gmail.com wrote: +1 for this issue. Documentation for spark-submit are misleading. Among many issues, the jar support is bad. HTTP urls do not work. This is because spark is using hadoop's FileSystem class. You have to specify the jars twice to get things to work. Once for the DriverWrapper to laid your classes and a 2nd time in the Context to distribute to workers. I would like to see some contrib response to this issue. Gino B. On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: Did you manage to make it work? I'm facing similar problems and this a serious blocker issue. spark-submit seems kind of broken to me if you can use it for spark-streaming. Regards, Luis 2014-06-11 1:48 GMT+01:00 lannyripple lanny.rip...@gmail.com: I am using Spark 1.0.0 compiled with Hadoop 1.2.1. I have a toy spark-streaming-kafka program. It reads from a kafka queue and does stream .map {case (k, v) = (v, 1)} .reduceByKey(_ + _)
Re: spark with docker: errors with akka, NAT?
Long story [1] short, akka opens up dynamic, random ports for each job [2]. So, simple NAT fails. You might try some trickery with a DNS server and docker's --net=host . [1] http://apache-spark-user-list.1001560.n3.nabble.com/Comprehensive-Port-Configuration-reference-tt5384.html#none [2] http://spark.apache.org/docs/latest/spark-standalone.html#configuring-ports-for-network-security Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 From: Mohit Jaggi mohitja...@gmail.com To: user@spark.apache.org Date: 06/16/2014 05:36 PM Subject:spark with docker: errors with akka, NAT? Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.
Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space
I've been trying to figure out how to increase the heap space for my spark environment in 1.0.0, and all of the things I've found tell me I have export something in Java Opts, which is deprecated in 1.0.0, or in increase the spark.executor.memory, which is at 6G. I'm only trying to process about 400-500 mB of text, but I get this error whenever I try to collect: 14/06/17 11:00:21 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sp...@salinger.ornl.gov:50251 14/06/17 11:00:21 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 165 bytes 14/06/17 11:00:35 INFO BlockManagerInfo: Added taskresult_14 in memory on salinger.ornl.gov:50253 (size: 123.7 MB, free: 465.1 MB) 14/06/17 11:00:35 INFO BlockManagerInfo: Added taskresult_13 in memory on salinger.ornl.gov:50253 (size: 127.7 MB, free: 337.4 MB) 14/06/17 11:00:36 ERROR Utils: Uncaught exception in thread Result resolver thread-2 java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:39) at java.nio.ByteBuffer.allocate(ByteBuffer.java:312) at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94) at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) at org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) at org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:481) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:53) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Any idea how to fix heap space errors in 1.0.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7733.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space
I've been trying to figure out how to increase the heap space for my spark environment in 1.0.0, and all of the things I've found tell me I have export something in Java Opts, which is deprecated in 1.0.0, or in increase the spark.executor.memory, which is at 6G. I'm only trying to process about 400-500 mB of text, but I get this error whenever I try to collect: java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:39) at java.nio.ByteBuffer.allocate(ByteBuffer.java:312) at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94) at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) at org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) at org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:481) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:53) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Any idea how to fix heap space errors in 1.0.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7735.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space
Try repartitioning the RDD using coalsce(int partitions) before performing any transforms. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7735p7736.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: wholeTextFiles not working with HDFS
I can write one if you'll point me to where I need to write it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7737.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark streaming questions
Hey I am new to spark streaming and apologize if these questions have been asked. * In StreamingContext, reduceByKey() seems to only work on the RDDs of the current batch interval, not including RDDs of previous batches. Is my understanding correct? * If the above statement is correct, what functions to use if one wants to do processing on the continuous stream batches of data? I see 2 functions, reduceByKeyAndWindow and updateStateByKey which serve this purpose. My use case is an aggregation and doesn't fit a windowing scenario. * As for updateStateByKey, I have a few questions. ** Over time, will spark stage original data somewhere to replay in case of failures? Say the Spark job run for weeks, I am wondering how that sustains? ** Say my reduce key space is partitioned by some date field and I would like to stop processing old dates after a period time (this is not a simply windowing scenario as which date the data belongs to is not the same thing when the data arrives). How can I handle this to tell spark to discard data for old dates? Thank you, Best Chen
Re: Memory footprint of Calliope: Spark - Cassandra writes
Gerard, Strings in particular are very inefficient because they're stored in a two-byte format by the JVM. If you use the Kryo serializer and have use StorageLevel.MEMORY_ONLY_SER then Kryo stores Strings in UTF8, which for ASCII-like strings will take half the space. Andrew On Tue, Jun 17, 2014 at 8:54 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Rohit, Thanks a lot for looking at this. The intention of calculating the data upfront it to only benchmark the time it takes store in records/sec eliminating the generation factor from it (which will be different on the real scenario, reading from HDFS) I used a profiler today and indeed it's not the storage part, but the generation that's bloating the memory. Objects in memory take surprisingly more space that one would expect based on the data they hold. In my case it was 2.1x the size of the original data. Now that we are talking about this, do you have some figures of how Calliope compares -performance wise- to a classic Cassandra driver (DataStax / Astyanax) ? that would be awesome. Thanks again! -kr, Gerard. On Tue, Jun 17, 2014 at 4:27 PM, tj opensource opensou...@tuplejump.com wrote: Dear Gerard, I just tried the code you posted in the gist ( https://gist.github.com/maasg/68de6016bffe5e71b78c) and it does give a OOM. It is cause of the data being generated locally and then paralellized - -- val entries = for (i - 1 to total) yield { Array(sdevy$i, aggr, 1000, sum, (i to i+10).mkString(,)) } val rdd = sc.parallelize(entries,8) -- This will generate all the data on the local system and then try to partition it. Instead, we should paralellize the keys (i - 1 to total) and generate data in the map tasks. This is *closer* to what you will get if you distribute out a file on a DFS like HDFS/SnackFS. I have made the change in the script here ( https://gist.github.com/milliondreams/aac52e08953949057e7d) -- val rdd = sc.parallelize(1 to total, 8).map(i = Array(sdevy$i, aggr, 1000, sum, (i to i+10).mkString(,))) -- I was able to insert 50M records using just over 350M RAM. Attaching the log and screenshot. Let me know if you still face this issue... we can do a screen share and resolve thee issue there. And thanks for using Calliope. I hope it serves your needs. Cheers, Rohit On Mon, Jun 16, 2014 at 9:57 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, I've been doing some testing with Calliope as a way to do batch load from Spark into Cassandra. My initial results are promising on the performance area, but worrisome on the memory footprint side. I'm generating N records of about 50 bytes each and using the UPDATE mutator to insert them into C*. I get OOM if my memory is below 1GB per million of records, or about 50Mb of raw data (without counting any RDD/structural overhead). (See code [1]) (so, to avoid confusions: e.g.: I need 4GB RAM to save 4M of 50Byte records to Cassandra) That's an order of magnitude more than the RAW data. I understood that Calliope builds on top of the Hadoop support of Cassandra, which builds on top of SSTables and sstableloader. I would like to know what's the memory usage factor of Calliope and what parameters could I use to control/tune that. Any experience/advice on that? -kr, Gerard. [1] https://gist.github.com/maasg/68de6016bffe5e71b78c
Executors not utilized properly.
I am creating around 10 executors with 12 cores and 7g memory, but when i launch a task not all executors are being used. For example if my job has 9 tasks, only 3 executors are being used with 3 task each and i believe this is making my app slower than map reduce program for the same use case. Can any one throw some light on executor configuration if any?How can i use all the executors. I am running spark on yarn and Hadoop 2.4.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Worker dies while submitting a job
Ok... I was checking the wrong version of that file yesterday. My worker is sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no case branch for that state and the worker is crashing. I still don't know why I'm getting a FAILED state but I'm sure that should kill the actor due to a scala.MatchError. Usually in scala is a best-practice to use a sealed trait and case classes/objects in a match statement instead of an enumeration (the compiler will complain about missing cases); I think that should be refactored to catch this kind of errors at compile time. Now I need to find why that state changed message is sent... I will continue updating this thread until I found the problem :D 2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: I'm playing with a modified version of the TwitterPopularTags example and when I tried to submit the job to my cluster, workers keep dying with this message: 14/06/16 17:11:16 INFO DriverRunner: Launch Command: java -cp /opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.deploy.worker.DriverWrapper akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker org.apache.spark.examples.streaming.TwitterPopularTags 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class scala.Enumeration$Val) scala.MatchError: FAILED (of class scala.Enumeration$Val) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/06/16 17:11:17 INFO Worker: Starting Spark worker int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at http://int-spark-app-ie005d6a3.mclabs.io:8081 14/06/16 17:11:17 INFO Worker: Connecting to master spark://int-spark-app-ie005d6a3.mclabs.io:7077... 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to re-register worker at same address: akka.tcp:// sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676 This happens when the worker receive a DriverStateChanged(driverId, state, exception) message. To deploy the job I copied the jar file to the temporary folder of master node and execute the following command: ./spark-submit \ --class org.apache.spark.examples.streaming.TwitterPopularTags \ --master spark://int-spark-master:7077 \ --deploy-mode cluster \ file:///tmp/spark-test-0.1-SNAPSHOT.jar I don't really know what the problem could be as there is a 'case _' that should avoid that problem :S
Re: Yarn-client mode and standalone-client mode hang during job start
Standalone-client mode is not officially supported at the moment. For standalone-cluster and yarn-client modes, however, they should work. For both modes, are you running spark-submit from within the cluster, or outside of it? If the latter, could you try running it from within the cluster and see if it works? (Does your rtgraph.jar exist on the machine from which you run spark-submit?) 2014-06-17 2:41 GMT-07:00 Jianshi Huang jianshi.hu...@gmail.com: Hi, I've stuck using either yarn-client or standalone-client mode. Either will stuck when I submit jobs, the last messages it printed were: ... 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/rtgraph.jar at http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@6655cf60 I can use yarn-cluster to run my app but it's not very convenient to monitor the progress. Standalone-cluster mode doesn't work, it reports file not found error: Driver successfully submitted as driver-20140617023956-0003 ... waiting before polling master for driver state ... polling master for driver state State of driver-20140617023956-0003 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/x/home/jianshuang/tmp/rtgraph.jar does not exist I'm using Spark 1.0.0 and my submit command looks like this: ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph' --class com.paypal.rtgraph.demo.MapReduceWriter --master spark:// lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','` --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster rtgraph.jar List of jars I put in --jars option are: accumulo-core.jar accumulo-fate.jar accumulo-minicluster.jar accumulo-trace.jar accumulo-tracer.jar chill_2.10-0.3.6.jar commons-math.jar commons-vfs2.jar config-1.2.1.jar gson.jar guava.jar joda-convert-1.2.jar joda-time-2.3.jar kryo-2.21.jar libthrift.jar quasiquotes_2.10-2.0.0-M8.jar scala-async_2.10-0.9.1.jar scala-library-2.10.4.jar scala-reflect-2.10.4.jar Anyone has hint what went wrong? Really confused. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: join operation is taking too much time
How long does it get stuck for? This is a common sign for the OS thrashing due to out of memory exceptions. If you keep it running longer, does it throw an error? Depending on how large your other RDD is (and your join operation), memory pressure may or may not be the problem at all. It could be that spilling your shuffles to disk is slowing you down (but probably shouldn't hang your application). For the 5 RDDs case, what happens if you set spark.shuffle.spill to false? 2014-06-17 5:59 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in: Hi all, I want to do a recursive leftOuterJoin between an RDD (created from file) with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 30 diff files in each iteration of a loop) varying from 1 to 6 million rows. When I run it for 5 RDDs,its running successfully in 5 minutes.But when I increase it to 10 or 30 RDDs its gradually slowing down and finally getting stuck without showing any warning or error. I am running in standalone mode with 2 workers of 4GB each and a total of 16 cores . Any of you facing similar problems with JOIN or is it a problem with my configuration. Thanks Regards, Meethu M
Re: Worker dies while submitting a job
I have been able to submit a job successfully but I had to config my spark job this way: val sparkConf: SparkConf = new SparkConf() .setAppName(TwitterPopularTags) .setMaster(spark://int-spark-master:7077) .setSparkHome(/opt/spark) .setJars(Seq(/tmp/spark-test-0.1-SNAPSHOT.jar)) Now I'm getting this error on my worker: 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 2014-06-17 17:36 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: Ok... I was checking the wrong version of that file yesterday. My worker is sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no case branch for that state and the worker is crashing. I still don't know why I'm getting a FAILED state but I'm sure that should kill the actor due to a scala.MatchError. Usually in scala is a best-practice to use a sealed trait and case classes/objects in a match statement instead of an enumeration (the compiler will complain about missing cases); I think that should be refactored to catch this kind of errors at compile time. Now I need to find why that state changed message is sent... I will continue updating this thread until I found the problem :D 2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: I'm playing with a modified version of the TwitterPopularTags example and when I tried to submit the job to my cluster, workers keep dying with this message: 14/06/16 17:11:16 INFO DriverRunner: Launch Command: java -cp /opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.deploy.worker.DriverWrapper akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker org.apache.spark.examples.streaming.TwitterPopularTags 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class scala.Enumeration$Val) scala.MatchError: FAILED (of class scala.Enumeration$Val) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/06/16 17:11:17 INFO Worker: Starting Spark worker int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at http://int-spark-app-ie005d6a3.mclabs.io:8081 14/06/16 17:11:17 INFO Worker: Connecting to master spark://int-spark-app-ie005d6a3.mclabs.io:7077... 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to re-register worker at same address: akka.tcp:// sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676 This happens when the worker receive a DriverStateChanged(driverId, state, exception) message. To deploy the job I copied the jar file to the temporary folder of master node and execute the following command: ./spark-submit \ --class org.apache.spark.examples.streaming.TwitterPopularTags \ --master spark://int-spark-master:7077 \ --deploy-mode cluster \ file:///tmp/spark-test-0.1-SNAPSHOT.jar I don't really know what the problem could be as there is a 'case _' that should avoid that problem :S
Re: spark streaming, kafka, SPARK_CLASSPATH
I have been able to submit a job successfully but I had to config my spark job this way: val sparkConf: SparkConf = new SparkConf() .setAppName(TwitterPopularTags) .setMaster(spark://int-spark-master:7077) .setSparkHome(/opt/spark) .setJars(Seq(/tmp/spark-test-0.1-SNAPSHOT.jar)) Now I'm getting this error on my worker: 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: After playing a bit, I have been able to create a fatjar this way: lazy val rootDependencies = Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, org.apache.spark %% spark-streaming-twitter % 1.0.0 exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) ) Excluding those transitive dependencies, we can create a fatjar ~400Kb instead of 40Mb. My problem is not to run the streaming job locally but trying to submit it to standalone cluster using spark-submit, everytime I ran the following command, my workers died: ~/development/tools/spark/1.0.0/bin/spark-submit \ --class org.apache.spark.examples.streaming.TwitterPopularTags \ --master spark://int-spark-master:7077 \ --deploy-mode cluster \ file:///tmp/spark-test-0.1-SNAPSHOT.jar I have copied my fatjar to my master /tmp folder. 2014-06-17 10:30 GMT+01:00 Michael Cutler mich...@tumra.com: Admittedly getting Spark Streaming / Kafka working for the first time can be a bit tricky with the web of dependencies that get pulled in. I've taken the KafkaWorkCount example from the Spark project and set up a simple standalone SBT project that shows you how to get it working and using spark-submit. *https://github.com/cotdp/spark-example-kafka https://github.com/cotdp/spark-example-kafka* The key trick is in the use of sbt-assembly instead of relying on any of the add jars functionality. You mark spark-core and spark-streaming as provided, because they are part of the core spark-assembly already running your cluster. However spark-streaming-kafka is not, so you need to package it in your 'fat JAR' while excluding all the mess that causes the build to break. build.sbt https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt: import AssemblyKeys._ assemblySettings name := spark-example-kafka version := 1.0 scalaVersion := 2.10.4 jarName in assembly := spark-example-kafka_2.10-1.0.jar assemblyOption in assembly ~= { _.copy(includeScala = false) } libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-kafka % 1.0.0). exclude(commons-beanutils, commons-beanutils). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog) ) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case x if x.startsWith(META-INF/ECLIPSEF.RSA) = MergeStrategy.last case x if x.startsWith(META-INF/mailcap) = MergeStrategy.last case x if x.startsWith(plugin.properties) = MergeStrategy.last case x = old(x) } } You can see the exclude() has to go around the spark-streaming-kafka dependency, and I've used a MergeStrategy to solve the deduplicate: different file contents found in the following errors. Build the JAR with sbt assembly and use the scripts in bin/ to run the examples. I'm using this same approach to run my Spark Streaming jobs with spark-submit and have them managed using Mesos/Marathon http://mesosphere.io/ to handle failures and restarts with long running processes. Good luck! MC *Michael Cutler* Founder, CTO * Mobile: +44 789 990 7847 Email: mich...@tumra.com mich...@tumra.com Web: tumra.com http://tumra.com/?utm_source=signatureutm_medium=email * *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq* *Registered in England Wales, 07916412. VAT No. 130595328* This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 17 June 2014 02:51, Gino Bustelo lbust...@gmail.com wrote: +1 for this issue. Documentation for spark-submit are misleading. Among many issues, the jar support is bad. HTTP urls do not work. This is because spark is using hadoop's FileSystem class. You have to specify the jars twice to get things to work. Once for the DriverWrapper to laid your classes and a 2nd time in the Context to distribute to
Re: Spark sql unable to connect to db2 hive metastore
Thanks Michael! as I run it using spark-shell, so I added both jars through bin/spark-shell --jars options. I noticed if I don't pass these jars, it complains it couldn't find the driver, if I pass them through --jars options, it complains there is no suitable driver. Regards. On Tue, Jun 17, 2014 at 2:43 AM, Michael Armbrust mich...@databricks.com wrote: First a clarification: Spark SQL does not talk to HiveServer2, as that JDBC interface is for retrieving results from queries that are executed using Hive. Instead Spark SQL will execute queries itself by directly accessing your data using Spark. Spark SQL's Hive module can use JDBC to connect to an external metastore, in your case DB2. This is only used to retrieve the metadata (i.e., column names and types, HDFS locations for data) Looking at your exception I still see java.sql.SQLException: No suitable driver, so my guess would be that the DB2 JDBC drivers are not being correctly included. How are you trying to add them to the classpath? Michael On Tue, Jun 17, 2014 at 1:29 AM, Jenny Zhao linlin200...@gmail.com wrote: Hi, my hive configuration use db2 as it's metastore database, I have built spark with the extra step sbt/sbt assembly/assembly to include the dependency jars. and copied HIVE_HOME/conf/hive-site.xml under spark/conf. when I ran : hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) got following exception, pasted portion of the stack trace here, looking at the stack, this made me wondering if Spark supports remote metastore configuration, it seems spark doesn't talk to hiveserver2 directly? the driver jars: db2jcc-10.5.jar, db2jcc_license_cisuz-10.5.jar both are included in the classpath, otherwise, it will complain it couldn't find the driver. Appreciate any help to resolve it. Thanks! Caused by: java.sql.SQLException: Unable to open a test connection to the given database. JDBC url = jdbc:db2://localhost:50001/BIDB, username = catalog. Terminating connection pool. Original Exception: -- java.sql.SQLException: No suitable driver at java.sql.DriverManager.getConnection(DriverManager.java:422) at java.sql.DriverManager.getConnection(DriverManager.java:374) at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:254) at com.jolbox.bonecp.BoneCP.init(BoneCP.java:305) at com.jolbox.bonecp.BoneCPDataSource.maybeInit(BoneCPDataSource.java:150) at com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:112) at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:479) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:304) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:56) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:39) at java.lang.reflect.Constructor.newInstance(Constructor.java:527) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301) at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1069) at org.datanucleus.NucleusContext.initialise(NucleusContext.java:359) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:768) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:326) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:195) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37) at java.lang.reflect.Method.invoke(Method.java:611) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(AccessController.java:277) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304) at
Re: What is the best way to handle transformations or actions that takes forever?
I've tried enabling the speculative jobs, this seems partially solved the problem, however I'm not sure if it can handle large-scale situations as it only start when 75% of the job is finished. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Executors not utilized properly.
Can some one help me with this. Any help is appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7753.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: What is the best way to handle transformations or actions that takes forever?
I think you need to implement a timeout in your code. As far as I know, Spark will not interrupt the execution of your code as long as the driver is connected. Might be an idea though. On Tue, Jun 17, 2014 at 7:54 PM, Peng Cheng pc...@uow.edu.au wrote: I've tried enabling the speculative jobs, this seems partially solved the problem, however I'm not sure if it can handle large-scale situations as it only start when 75% of the job is finished. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming, kafka, SPARK_CLASSPATH
Luis' experience validates what I'm seeing. You have to still set the properties in the SparkConf for the context to work. For example, master URL and jars are specified again in the app. Gino B. On Jun 17, 2014, at 12:05 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I have been able to submit a job successfully but I had to config my spark job this way: val sparkConf: SparkConf = new SparkConf() .setAppName(TwitterPopularTags) .setMaster(spark://int-spark-master:7077) .setSparkHome(/opt/spark) .setJars(Seq(/tmp/spark-test-0.1-SNAPSHOT.jar)) Now I'm getting this error on my worker: 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: After playing a bit, I have been able to create a fatjar this way: lazy val rootDependencies = Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, org.apache.spark %% spark-streaming-twitter % 1.0.0 exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) ) Excluding those transitive dependencies, we can create a fatjar ~400Kb instead of 40Mb. My problem is not to run the streaming job locally but trying to submit it to standalone cluster using spark-submit, everytime I ran the following command, my workers died: ~/development/tools/spark/1.0.0/bin/spark-submit \ --class org.apache.spark.examples.streaming.TwitterPopularTags \ --master spark://int-spark-master:7077 \ --deploy-mode cluster \ file:///tmp/spark-test-0.1-SNAPSHOT.jar I have copied my fatjar to my master /tmp folder. 2014-06-17 10:30 GMT+01:00 Michael Cutler mich...@tumra.com: Admittedly getting Spark Streaming / Kafka working for the first time can be a bit tricky with the web of dependencies that get pulled in. I've taken the KafkaWorkCount example from the Spark project and set up a simple standalone SBT project that shows you how to get it working and using spark-submit. https://github.com/cotdp/spark-example-kafka The key trick is in the use of sbt-assembly instead of relying on any of the add jars functionality. You mark spark-core and spark-streaming as provided, because they are part of the core spark-assembly already running your cluster. However spark-streaming-kafka is not, so you need to package it in your 'fat JAR' while excluding all the mess that causes the build to break. build.sbt: import AssemblyKeys._ assemblySettings name := spark-example-kafka version := 1.0 scalaVersion := 2.10.4 jarName in assembly := spark-example-kafka_2.10-1.0.jar assemblyOption in assembly ~= { _.copy(includeScala = false) } libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.apache.spark %% spark-streaming % 1.0.0 % provided, (org.apache.spark %% spark-streaming-kafka % 1.0.0). exclude(commons-beanutils, commons-beanutils). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog) ) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case x if x.startsWith(META-INF/ECLIPSEF.RSA) = MergeStrategy.last case x if x.startsWith(META-INF/mailcap) = MergeStrategy.last case x if x.startsWith(plugin.properties) = MergeStrategy.last case x = old(x) } } You can see the exclude() has to go around the spark-streaming-kafka dependency, and I've used a MergeStrategy to solve the deduplicate: different file contents found in the following errors. Build the JAR with sbt assembly and use the scripts in bin/ to run the examples. I'm using this same approach to run my Spark Streaming jobs with spark-submit and have them managed using Mesos/Marathon to handle failures and restarts with long running processes. Good luck! MC Michael Cutler Founder, CTO Mobile: +44 789 990 7847 Email: mich...@tumra.com Web: tumra.com Visit us at our offices in Chiswick Park Registered in England Wales, 07916412. VAT No. 130595328 This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 17 June 2014 02:51, Gino Bustelo lbust...@gmail.com wrote: +1 for this issue. Documentation for spark-submit are misleading. Among many issues, the jar support is bad. HTTP urls do not
Re: Executors not utilized properly.
It sounds like your job has 9 tasks and all are executing simultaneously in parallel. This is as good as it gets right? Are you asking how to break the work into more tasks, like 120 to match your 10*12 cores? Make your RDD have more partitions. For example the textFile method can override the default number of partitions determined by HDFS splits. On Jun 17, 2014 5:37 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: I am creating around 10 executors with 12 cores and 7g memory, but when i launch a task not all executors are being used. For example if my job has 9 tasks, only 3 executors are being used with 3 task each and i believe this is making my app slower than map reduce program for the same use case. Can any one throw some light on executor configuration if any?How can i use all the executors. I am running spark on yarn and Hadoop 2.4.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: join operation is taking too much time
I've been wondering about this. Is there a difference in performance between these two? val rdd1 = sc.textFile(files.mkString(,)) val rdd2 = sc.union(files.map(sc .textFile(_))) I don't know about your use-case, Meethu, but it may be worth trying to see if reading all the files into one RDD (like rdd1) would perform better in the join. (If this is possible in your situation.) On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or and...@databricks.com wrote: How long does it get stuck for? This is a common sign for the OS thrashing due to out of memory exceptions. If you keep it running longer, does it throw an error? Depending on how large your other RDD is (and your join operation), memory pressure may or may not be the problem at all. It could be that spilling your shuffles to disk is slowing you down (but probably shouldn't hang your application). For the 5 RDDs case, what happens if you set spark.shuffle.spill to false? 2014-06-17 5:59 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in: Hi all, I want to do a recursive leftOuterJoin between an RDD (created from file) with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 30 diff files in each iteration of a loop) varying from 1 to 6 million rows. When I run it for 5 RDDs,its running successfully in 5 minutes.But when I increase it to 10 or 30 RDDs its gradually slowing down and finally getting stuck without showing any warning or error. I am running in standalone mode with 2 workers of 4GB each and a total of 16 cores . Any of you facing similar problems with JOIN or is it a problem with my configuration. Thanks Regards, Meethu M
Problems running Spark job on mesos in fine-grained mode
Hi, I'm having trouble running spark on mesos in fine-grained mode. I'm running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which most of the time, but not always, cause the job to fail. The same code is running fine in coarse-grained mode. I see the following exceptions in the logs of the spark driver: W0617 10:57:36.774382 8735 sched.cpp:901] Attempting to launch task 21 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715 W0617 10:57:36.774433 8735 sched.cpp:901] Attempting to launch task 22 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for 201311011608-1369465866-5050-9189-46 from TaskSet 0.0 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2) 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0) 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1) 14/06/17 10:57:36 INFO DAGScheduler: Executor lost: 201311011608-1369465866-5050-9189-46 (epoch 0) 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove executor 201311011608-1369465866-5050-9189-46 from BlockManagerMaster. 14/06/17 10:57:36 INFO BlockManagerMaster: Removed 201311011608-1369465866-5050-9189-46 successfully in removeExecutor 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list earlier: ca1-dcc1-0065.lab.mtl I don't see any exceptions in the spark executor logs. The only error message I found in mesos itself is warnings in the mesos master: W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21 : Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered mem(*):3216; disk(*):98304; ports(*):[11900-11919, 1192 1-11995, 11997-11999]; cpus(*):1 W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22 : Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered mem(*):3216; disk(*):98304; ports(*):[11900-11919, 1192 1-11995, 11997-11999]; cpus(*):1 W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28 : Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1; mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304; ports(*):[11900- 11960, 11962-11978, 11980-11999] W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-46 on slave 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl) W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-34 on slave 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl) W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-59 on slave 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl) W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited executor 201311011608-1369465866-5050-9189-18 on slave 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl) ... (more of those Ignoring unknown exited executor) I analyzed the difference in between the execution of the same job in coarse-grained mode and fine-grained mode, and I noticed that in the fine-grained mode the tasks get executed on executors different than the ones reported in spark, as if spark and mesos get out of sync as to which executor is responsible for which task. See the following: Coarse-grained mode: SparkMesosTask IndexTask IDExecutorStatusTask ID (UI)Task NameTask ID (logs) ExecutorState0066SUCCESS4Task 4066RUNNING1159SUCCESS0Task 0159RUNNING22 54SUCCESS10Task 10254RUNNING33128SUCCESS6Task 63128RUNNING... Fine-grained mode: SparkMesosTask IndexTask IDExecutorTask ID (UI)Task NameTask ID (logs) ExecutorState023108SUCCESS23task 0.0:02327FINISHED01965FAILED19task 0.0:01986FINISHED12165FAILEDMesos executor was never created12492SUCCESS24task 0.0:124129FINISHED22265FAILEDMesos executor was never created225100SUCCESS 25task 0.0:22584FINISHED32680SUCCESS26task 0.0:326124FINISHED42765FAILED 27task 0.0:427108FINISHED42992SUCCESS29task 0.0:42965FINISHED52865FAILEDMesos executor was never created53077SUCCESS30task 0.0:53062FINISHED6053SUCCESS0task 0.0:6041FINISHED7177SUCCESS1task 0.0:71114FINISHED... Is it normal that the executor reported in spark and mesos to be different when running in fine-grained mode? Please note that in this particular example the job actually succeeded, but most of the time it's failing after 4 failed attempts of a given task. This job never fails in coarse-grained mode. Every job is working in coarse-grained mode and failing the same way in fine-grained mode. Does anybody have an idea what the problem could be? Thanks, - Sebastien
Re: Executors not utilized properly.
I did try creating more partitions by overriding the default number of partitions determined by HDFS splits. Problem is, in this case program will run for ever. I have same set of inputs for map reduce and spark. Where map reduce is taking 2 mins, spark is taking 5 min to complete the job. I thought because all of the executors are not being utilized properly my spark program is running slower than map reduce. I can provide you my code skeleton for your reference. Please help me with this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7759.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: news20-binary classification with LogisticRegressionWithSGD
Here is follow-up to the previous evaluation. aggregate at GradientDescent.scala:178 never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178 We confirmed, by -verbose:gc, that GC is not happening during the aggregate and the cumulative CPU time for the task is increasing little by little. LBFGS also does not work for large # of features (news20.random.1000) though it works fine for small # of features (news20.binary.1000). aggregate at LBFGS.scala:201 also never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201 --- [Evaluated code for LBFGS] import org.apache.spark.SparkContext import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.mllib.optimization._ val data = MLUtils.loadLibSVMFile(sc, hdfs://dm01:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = data.take(1)(0).features.size val training = data.map(x = (x.label, MLUtils.appendBias(x.features))).cache() // Run training algorithm to build the model val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 20 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS( training, new LogisticGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) --- Thanks, Makoto 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com: Hello, I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though the number of training examples used in the evaluation is just 1,000. It works fine for the dataset *news20.binary.1000* that has 178,560 features. However, it does not work for *news20.random.1000* where # of features is large (1,354,731 features) though we used a sparse vector through MLUtils.loadLibSVMFile(). The execution seems not progressing while no error is reported in the spark-shell as well as in the stdout/stderr of executors. We used 32 executors with each allocating 7GB (2GB is for RDD) for working memory. Any suggesions? Your help is really appreciated. == Executed code == import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionWithSGD //val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.binary.1000, multiclass=false) val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = training .take(1)(0).features.size //numFeatures: Int = 178560 for news20.binary.1000 //numFeatures: Int = 1354731 for news20.random.1000 val model = LogisticRegressionWithSGD.train(training, numIterations=1) == The dataset used in the evaluation == http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' news20.binary.1000 $ sort -R news20.binary news20.random $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' news20.random.1000 You can find the dataset in https://dl.dropboxusercontent.com/u/13123103/news20.random.1000 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000 Thanks, Makoto
Spark streaming RDDs to Parquet records
Hello, Is there an easy way to convert RDDs within a DStream into Parquet records? Here is some incomplete pseudo code: // Create streaming context val ssc = new StreamingContext(...) // Obtain a DStream of events val ds = KafkaUtils.createStream(...) // Get Spark context to get to the SQL context val sc = ds.context.sparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) // For each RDD ds.foreachRDD((rdd: RDD[Array[Byte]]) = { // What do I do next? }) Thanks, Mahesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Makoto, How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui yuin...@gmail.com wrote: Here is follow-up to the previous evaluation. aggregate at GradientDescent.scala:178 never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178 We confirmed, by -verbose:gc, that GC is not happening during the aggregate and the cumulative CPU time for the task is increasing little by little. LBFGS also does not work for large # of features (news20.random.1000) though it works fine for small # of features (news20.binary.1000). aggregate at LBFGS.scala:201 also never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201 --- [Evaluated code for LBFGS] import org.apache.spark.SparkContext import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.mllib.optimization._ val data = MLUtils.loadLibSVMFile(sc, hdfs://dm01:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = data.take(1)(0).features.size val training = data.map(x = (x.label, MLUtils.appendBias(x.features))).cache() // Run training algorithm to build the model val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 20 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS( training, new LogisticGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) --- Thanks, Makoto 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com: Hello, I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though the number of training examples used in the evaluation is just 1,000. It works fine for the dataset *news20.binary.1000* that has 178,560 features. However, it does not work for *news20.random.1000* where # of features is large (1,354,731 features) though we used a sparse vector through MLUtils.loadLibSVMFile(). The execution seems not progressing while no error is reported in the spark-shell as well as in the stdout/stderr of executors. We used 32 executors with each allocating 7GB (2GB is for RDD) for working memory. Any suggesions? Your help is really appreciated. == Executed code == import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionWithSGD //val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.binary.1000, multiclass=false) val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = training .take(1)(0).features.size //numFeatures: Int = 178560 for news20.binary.1000 //numFeatures: Int = 1354731 for news20.random.1000 val model = LogisticRegressionWithSGD.train(training, numIterations=1) == The dataset used in the evaluation == http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' news20.binary.1000 $ sort -R news20.binary news20.random $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' news20.random.1000 You can find the dataset in https://dl.dropboxusercontent.com/u/13123103/news20.random.1000 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000 Thanks, Makoto
Spark SQL: No function to evaluate expression
Dear all, I am trying to run the following query on Spark SQL using some custom TPC-H tables with standalone Spark cluster configuration: SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE a.c_address b.c_address; Unfortunately I get the following error during execution: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure in TID 12 on host kw2260.kaust.edu.sa: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59) org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147) org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74) org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100) Is this a bug or am I doing something wrong? Regards, Zuhair Khayyat
Re: Contribution to Spark MLLib
Hi Jayati, Thanks for asking! MLlib algorithms are all implemented in Scala. It makes us easier to maintain if we have the implementations in one place. For the roadmap, please visit http://www.slideshare.net/xrmeng/m-llib-hadoopsummit to see features planned for v1.1. Before contributing new algorithms, it would be great if you can start with working on an existing JIRA. Best, Xiangrui On Tue, Jun 17, 2014 at 12:22 AM, Jayati tiwarijay...@gmail.com wrote: Hello, I wish to contribute some algorithms to the MLLib of Spark but at the same time wanted to make sure that I don't try something redundant. Will it be okay with you to let me know the set of algorithms which aren't there in your road map in the near future ? Also, can I use Java to write machine learning algorithms for Spark MLLib instead of Scala ? Regards, Jayati -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Execution stalls in LogisticRegressionWithSGD
Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 16 ms 19 619 SUCCESS PROCESS_LOCAL
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Xiangrui, What's different between treeAggregate and aggregate? Why treeAggregate scales better? What if we just use mapPartition, will it be as fast as treeAggregate? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 12:58 PM, Xiangrui Meng men...@gmail.com wrote: Hi Makoto, How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui yuin...@gmail.com wrote: Here is follow-up to the previous evaluation. aggregate at GradientDescent.scala:178 never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178 We confirmed, by -verbose:gc, that GC is not happening during the aggregate and the cumulative CPU time for the task is increasing little by little. LBFGS also does not work for large # of features (news20.random.1000) though it works fine for small # of features (news20.binary.1000). aggregate at LBFGS.scala:201 also never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201 --- [Evaluated code for LBFGS] import org.apache.spark.SparkContext import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.mllib.optimization._ val data = MLUtils.loadLibSVMFile(sc, hdfs://dm01:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = data.take(1)(0).features.size val training = data.map(x = (x.label, MLUtils.appendBias(x.features))).cache() // Run training algorithm to build the model val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 20 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS( training, new LogisticGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) --- Thanks, Makoto 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com: Hello, I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though the number of training examples used in the evaluation is just 1,000. It works fine for the dataset *news20.binary.1000* that has 178,560 features. However, it does not work for *news20.random.1000* where # of features is large (1,354,731 features) though we used a sparse vector through MLUtils.loadLibSVMFile(). The execution seems not progressing while no error is reported in the spark-shell as well as in the stdout/stderr of executors. We used 32 executors with each allocating 7GB (2GB is for RDD) for working memory. Any suggesions? Your help is really appreciated. == Executed code == import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionWithSGD //val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.binary.1000, multiclass=false) val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = training .take(1)(0).features.size //numFeatures: Int = 178560 for news20.binary.1000 //numFeatures: Int = 1354731 for news20.random.1000 val model = LogisticRegressionWithSGD.train(training, numIterations=1) == The dataset used in the evaluation == http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' news20.binary.1000 $ sort -R news20.binary news20.random $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' news20.random.1000 You can find the dataset in https://dl.dropboxusercontent.com/u/13123103/news20.random.1000 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000 Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: Spark streaming RDDs to Parquet records
Mahesh, - One direction could be : create a parquet schema, convert save the records to hdfs. - This might help https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala Cheers k/ On Tue, Jun 17, 2014 at 12:52 PM, maheshtwc mahesh.padmanab...@twc-contractor.com wrote: Hello, Is there an easy way to convert RDDs within a DStream into Parquet records? Here is some incomplete pseudo code: // Create streaming context val ssc = new StreamingContext(...) // Obtain a DStream of events val ds = KafkaUtils.createStream(...) // Get Spark context to get to the SQL context val sc = ds.context.sparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) // For each RDD ds.foreachRDD((rdd: RDD[Array[Byte]]) = { // What do I do next? }) Thanks, Mahesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Executors not utilized properly.
Hi Abhishek, Where mapreduce is taking 2 mins, spark is taking 5 min to complete the job. Interesting. Could you tell us more about your program? A code skeleton would certainly be helpful. Thanks! -Jey On Tue, Jun 17, 2014 at 3:21 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: I did try creating more partitions by overriding the default number of partitions determined by HDFS splits. Problem is, in this case program will run for ever. I have same set of inputs for map reduce and spark. Where map reduce is taking 2 mins, spark is taking 5 min to complete the job. I thought because all of the executors are not being utilized properly my spark program is running slower than map reduce. I can provide you my code skeleton for your reference. Please help me with this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7759.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: news20-binary classification with LogisticRegressionWithSGD
Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Unit test failure: Address already in use
Hi, I have 3 unit tests (independent of each other) in the /src/test/scala folder. When I run each of them individually using: sbt test-only test, all the 3 pass the test. But when I run them all using sbt test, then they fail with the warning below. I am wondering if the binding exception results in failure to run the job, thereby causing the failure. If so, what can I do to address this binding exception? I am running these tests locally on a standalone machine (i.e. SparkContext(local, test)). 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:174) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Makoto, Are you using Spark 1.0 or 0.9? Could you go to the executor tab of the web UI and check the driver's memory? treeAggregate is not part of 1.0. Best, Xiangrui On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Xiangrui, (2014/06/18 6:03), Xiangrui Meng wrote: Are you using Spark 1.0 or 0.9? Could you go to the executor tab of the web UI and check the driver's memory? I am using Spark 1.0. 588.8 MB is allocated for driver RDDs. I am setting SPARK_DRIVER_MEMORY=2g in the conf/spark-env.sh. The value allocated for driver RDDs in the web UI was not changed by doing as follows: $ SPARK_DRIVER_MEMORY=6g bin/spark-shell I set -verbose:gc but full GC (or continuous GCs) does not happen during the aggregate at the driver. Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
Xiangrui, Could you point to the JIRA related to tree aggregate ? ...sounds like the allreduce idea... I would definitely like to try it on our dataset... Makoto, I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB memory... Although the best result on the same dataset came out of liblinear and BFGS-L1 out of box...so I did not tune the SGD further on learning rate and other heuristics...it was arnd 5% off... Thanks. Deb On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: Spark streaming RDDs to Parquet records
Thanks Krishna. Seems like you have to use Avro and then convert that to Parquet. I was hoping to directly convert RDDs to Parquet files. I’ll look into this some more. Thanks, Mahesh From: Krishna Sankar ksanka...@gmail.commailto:ksanka...@gmail.com Reply-To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Date: Tuesday, June 17, 2014 at 2:41 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark streaming RDDs to Parquet records Mahesh, * One direction could be : create a parquet schema, convert save the records to hdfs. * This might help https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala Cheers k/ On Tue, Jun 17, 2014 at 12:52 PM, maheshtwc mahesh.padmanab...@twc-contractor.commailto:mahesh.padmanab...@twc-contractor.com wrote: Hello, Is there an easy way to convert RDDs within a DStream into Parquet records? Here is some incomplete pseudo code: // Create streaming context val ssc = new StreamingContext(...) // Obtain a DStream of events val ds = KafkaUtils.createStream(...) // Get Spark context to get to the SQL context val sc = ds.context.sparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) // For each RDD ds.foreachRDD((rdd: RDD[Array[Byte]]) = { // What do I do next? }) Thanks, Mahesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html Sent from the Apache Spark User List mailing list archive at Nabble.com. This E-mail and any of its attachments may contain Time Warner Cable proprietary information, which is privileged, confidential, or subject to copyright belonging to Time Warner Cable. This E-mail is intended solely for the use of the individual or entity to which it is addressed. If you are not the intended recipient of this E-mail, you are hereby notified that any dissemination, distribution, copying, or action taken in relation to the contents of and attachments to this E-mail is strictly prohibited and may be unlawful. If you have received this E-mail in error, please notify the sender immediately and permanently delete the original and any copy of this E-mail and any printout.
Re: Spark sql unable to connect to db2 hive metastore
finally got it work out, mimicked how spark added datanucleus jars in compute-classpath.sh, and added the db2jcc*.jar in the classpath, it works now. Thanks! On Tue, Jun 17, 2014 at 10:50 AM, Jenny Zhao linlin200...@gmail.com wrote: Thanks Michael! as I run it using spark-shell, so I added both jars through bin/spark-shell --jars options. I noticed if I don't pass these jars, it complains it couldn't find the driver, if I pass them through --jars options, it complains there is no suitable driver. Regards. On Tue, Jun 17, 2014 at 2:43 AM, Michael Armbrust mich...@databricks.com wrote: First a clarification: Spark SQL does not talk to HiveServer2, as that JDBC interface is for retrieving results from queries that are executed using Hive. Instead Spark SQL will execute queries itself by directly accessing your data using Spark. Spark SQL's Hive module can use JDBC to connect to an external metastore, in your case DB2. This is only used to retrieve the metadata (i.e., column names and types, HDFS locations for data) Looking at your exception I still see java.sql.SQLException: No suitable driver, so my guess would be that the DB2 JDBC drivers are not being correctly included. How are you trying to add them to the classpath? Michael On Tue, Jun 17, 2014 at 1:29 AM, Jenny Zhao linlin200...@gmail.com wrote: Hi, my hive configuration use db2 as it's metastore database, I have built spark with the extra step sbt/sbt assembly/assembly to include the dependency jars. and copied HIVE_HOME/conf/hive-site.xml under spark/conf. when I ran : hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) got following exception, pasted portion of the stack trace here, looking at the stack, this made me wondering if Spark supports remote metastore configuration, it seems spark doesn't talk to hiveserver2 directly? the driver jars: db2jcc-10.5.jar, db2jcc_license_cisuz-10.5.jar both are included in the classpath, otherwise, it will complain it couldn't find the driver. Appreciate any help to resolve it. Thanks! Caused by: java.sql.SQLException: Unable to open a test connection to the given database. JDBC url = jdbc:db2://localhost:50001/BIDB, username = catalog. Terminating connection pool. Original Exception: -- java.sql.SQLException: No suitable driver at java.sql.DriverManager.getConnection(DriverManager.java:422) at java.sql.DriverManager.getConnection(DriverManager.java:374) at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:254) at com.jolbox.bonecp.BoneCP.init(BoneCP.java:305) at com.jolbox.bonecp.BoneCPDataSource.maybeInit(BoneCPDataSource.java:150) at com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:112) at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:479) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:304) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:56) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:39) at java.lang.reflect.Constructor.newInstance(Constructor.java:527) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301) at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1069) at org.datanucleus.NucleusContext.initialise(NucleusContext.java:359) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:768) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:326) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:195) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37) at java.lang.reflect.Method.invoke(Method.java:611) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(AccessController.java:277) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at
Best practices for removing lineage of a RDD or Graph object?
If a RDD object have non-empty .dependencies, does that means it have lineage? How could I remove it? I'm doing iterative computing and each iteration depends on the result computed in previous iteration. After several iteration, it will throw StackOverflowError. At first I'm trying to use cache, I read the code in pregel.scala, which is part of GraphX, they use a count method to materialize the object after cache, but I attached a debugger and seems such approach does not empty .dependencies, and that also does not work in my code. Another alternative approach is using checkpoint, I tried checkpoint vertices and edges for my Graph object and then materialize it by count vertices and edges. Then I use .isCheckpointed to check if it is correctly checkpointed, but it always return false. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Why MLLib classes are so badly organized?
Can anybody explain WHY: 1) LabeledPoint is in regression/LabeledPoint.scala? This cause import regression modules from classification modules. 2) Vector and SparseVector are in linalg? OK. GeneralizedLinearModel is in regression/GeneralizedLinearAlgorithm.scala? Really? 3) LinearModel is in regression.py (Python MLLib module), but also imported from classification.py? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-MLLib-classes-are-so-badly-organized-tp7780.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark with docker: errors with akka, NAT?
I am using cutting edge code from git but doing my own sbt assembly. On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: Hi, are you using the amplab/spark-1.0.0 images from the global registry? Andre On 06/17/2014 01:36 AM, Mohit Jaggi wrote: Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3 :43921] disassociated! Shutting down.
Enormous EC2 price jump makes r3.large patch more important
Some people (me included) might have wondered why all our m1.large spot instances (in us-west-1) shut down a few hours ago... Simple reason: The EC2 spot price for Spark's default m1.large instances just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably something to do with world cup. So far this is just us-west-1, but prices have a tendency to equalize across centers as the days pass. Time to make backups and plans. m3 spot prices are still down at $0.02 (and being new, will be bypassed by older systems), so it would be REAAALLYY nice if there had been some progress on that issue. Let me know if I can help with testing and whatnot. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Spark streaming RDDs to Parquet records
If you convert the data to a SchemaRDD you can save it as Parquet: http://spark.apache.org/docs/latest/sql-programming-guide.html#using-parquet On Tue, Jun 17, 2014 at 11:47 PM, Padmanabhan, Mahesh (contractor) mahesh.padmanab...@twc-contractor.com wrote: Thanks Krishna. Seems like you have to use Avro and then convert that to Parquet. I was hoping to directly convert RDDs to Parquet files. I’ll look into this some more. Thanks, Mahesh From: Krishna Sankar ksanka...@gmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Tuesday, June 17, 2014 at 2:41 PM To: user@spark.apache.org user@spark.apache.org Subject: Re: Spark streaming RDDs to Parquet records Mahesh, - One direction could be : create a parquet schema, convert save the records to hdfs. - This might help https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala Cheers k/ On Tue, Jun 17, 2014 at 12:52 PM, maheshtwc mahesh.padmanab...@twc-contractor.com wrote: Hello, Is there an easy way to convert RDDs within a DStream into Parquet records? Here is some incomplete pseudo code: // Create streaming context val ssc = new StreamingContext(...) // Obtain a DStream of events val ds = KafkaUtils.createStream(...) // Get Spark context to get to the SQL context val sc = ds.context.sparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) // For each RDD ds.foreachRDD((rdd: RDD[Array[Byte]]) = { // What do I do next? }) Thanks, Mahesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- This E-mail and any of its attachments may contain Time Warner Cable proprietary information, which is privileged, confidential, or subject to copyright belonging to Time Warner Cable. This E-mail is intended solely for the use of the individual or entity to which it is addressed. If you are not the intended recipient of this E-mail, you are hereby notified that any dissemination, distribution, copying, or action taken in relation to the contents of and attachments to this E-mail is strictly prohibited and may be unlawful. If you have received this E-mail in error, please notify the sender immediately and permanently delete the original and any copy of this E-mail and any printout.
Re: news20-binary classification with LogisticRegressionWithSGD
DB, Yes, reduce and aggregate are linear. Makoto, dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+. That may be the problem. Which deploy mode are you using, standalone or local? Debasish, there is an old PR for butterfly allreduce. However, it doesn't seem to be the right way to go for Spark. I just sent out the PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it needs more testing before we are confident to merge it. It would be great if you can help test it. Best, Xiangrui On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com wrote: Xiangrui, Could you point to the JIRA related to tree aggregate ? ...sounds like the allreduce idea... I would definitely like to try it on our dataset... Makoto, I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB memory... Although the best result on the same dataset came out of liblinear and BFGS-L1 out of box...so I did not tune the SGD further on learning rate and other heuristics...it was arnd 5% off... Thanks. Deb On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: Executors not utilized properly.
I found the main reason to be that i was using coalesce instead of repartition. coalesce was shrinking the portioning so the number of tasks were very less to be executed by all of the executors. Can you help me in understudying when to use coalesce and when to use repartition. In application coalesce is being processed faster then repartition. Which is unusual. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7787.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark with docker: errors with akka, NAT?
I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote: It was a DNS issue. AKKA apparently uses the hostname of the endpoints and hence they need to be resolvable. In my case the hostname of the docker container was a randomly generated string and was not resolvable. I added a workaround (entry in etc/hosts file of spark master) for now. If anyone can point to a more elegant solution, that would be awesome! On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com wrote: I am using cutting edge code from git but doing my own sbt assembly. On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: Hi, are you using the amplab/spark-1.0.0 images from the global registry? Andre On 06/17/2014 01:36 AM, Mohit Jaggi wrote: Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.
Re: news20-binary classification with LogisticRegressionWithSGD
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng men...@gmail.com wrote: DB, Yes, reduce and aggregate are linear. Makoto, dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+. That may be the problem. Which deploy mode are you using, standalone or local? Debasish, there is an old PR for butterfly allreduce. However, it doesn't seem to be the right way to go for Spark. I just sent out the PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it needs more testing before we are confident to merge it. It would be great if you can help test it. Best, Xiangrui On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com wrote: Xiangrui, Could you point to the JIRA related to tree aggregate ? ...sounds like the allreduce idea... I would definitely like to try it on our dataset... Makoto, I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB memory... Although the best result on the same dataset came out of liblinear and BFGS-L1 out of box...so I did not tune the SGD further on learning rate and other heuristics...it was arnd 5% off... Thanks. Deb On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: Executors not utilized properly.
repartition() is actually just an alias of coalesce(), but which the shuffle flag to set to true. This shuffle is probably what you're seeing as taking longer, but it is required when you go from a smaller number of partitions to a larger. When actually decreasing the number of partitions, coalesce(shuffle = false) will be fully pipelined, but is limited in how it can redistribute data, as it can only combine whole partitions into larger partitions. For example, if you have an rdd with 101 partitions, and you do rdd.coalesce(100, shuffle = false), then the resultant rdd will have 99 of the original partitions, and 1 partition will just be 2 original partitions combined. This can lead to increased data skew, but requires no effort to create. On the other hand, if you do rdd.coalesce(100, shuffle = true), then all of the data will actually be reshuffled into 100 new evenly-sized partitions, eliminating any data skew at the cost of actually moving all data around. On Tue, Jun 17, 2014 at 4:52 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: I found the main reason to be that i was using coalesce instead of repartition. coalesce was shrinking the portioning so the number of tasks were very less to be executed by all of the executors. Can you help me in understudying when to use coalesce and when to use repartition. In application coalesce is being processed faster then repartition. Which is unusual. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7787.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Issue while trying to aggregate with a sliding window
Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Issue while trying to aggregate with a sliding window
There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Executors not utilized properly.
Perfect!! That makes so much sense to me now. Thanks a ton -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7793.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Execution stalls in LogisticRegressionWithSGD
Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL
Re: Spark SQL: No function to evaluate expression
The error message *means* that there is no column called c_address. However, maybe it's a bug with Spark SQL not understanding the a.c_address syntax. Can you double-check the column name is correct? Thanks Tobias On Wed, Jun 18, 2014 at 5:02 AM, Zuhair Khayyat zuhair.khay...@gmail.com wrote: Dear all, I am trying to run the following query on Spark SQL using some custom TPC-H tables with standalone Spark cluster configuration: SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE a.c_address b.c_address; Unfortunately I get the following error during execution: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure in TID 12 on host kw2260.kaust.edu.sa: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59) org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147) org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74) org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100) Is this a bug or am I doing something wrong? Regards, Zuhair Khayyat
Spark Streaming Example with CDH5
Hi Spark Gurus, I am trying to compile a spark streaming example with CDH5 and having problem compiling it. Has anyone created an example spark streaming using CDH5(preferably Spark 0.9.1) and would be kind enough to share the build.sbt(.scala) file?(or point to their example on github). I know there is a streaming example here https://github.com/apache/spark/tree/master/examples but I am looking for something that runs with CDH5. My build.scala files looks like given below. object Dependency { // Versions object V { val Akka = 2.3.0 val scala = 2.10.4 val cloudera = 0.9.0-cdh5.0.0 } val sparkCore = org.apache.spark %% spark-core% V.cloudera val sparkStreaming = org.apache.spark %% spark-streaming % V.cloudera resolvers ++= Seq( cloudera repo at https://repository.cloudera.com/artifactory/cloudera-repos/;, haddop repo at https://repository.cloudera.com/content/repositories/releases/;) I have also attached the complete build.scala file for sake of completeness. sbt dist gives the following error: object SecurityManager is not a member of package org.apache.spark [error] import org.apache.spark.{SparkConf, SecurityManager} build.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n7796/build.scala Appreciate the great work the spark community is doing. It is by far the best thing I have worked on. ..Manas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Example-with-CDH5-tp7796.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NullPointerExceptions when using val or broadcast on a standalone cluster.
Hi, I think this is a bug in Spark, because changing my program to using a main method instead of using the App trait fixes this problem. I've filed this as SPARK-2175, apologies if this turns out to be a duplicate. https://issues.apache.org/jira/browse/SPARK-2175 Regards, Brandon. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerExceptions-when-using-val-or-broadcast-on-a-standalone-cluster-tp7524p7797.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Yarn-client mode and standalone-client mode hang during job start
Hi Andrew, I submitted that within the cluster. Looks like the standalone-cluster mode didn't put the jars to its http server, and passed the file:/... to the driver node. That's why the driver node couldn't find the jars. However, I copied my files to all slaves, it still didn't work, see my second email. I have no idea why yarn-client didn't work. I'm suspecting the following code is problematic, possible? I have multiple files that needs the SparkContext, so I put it in an object (instead of the main function), and SContext is imported to multiple places. object SContext { var conf = new SparkConf().setAppName(Conf.getString(spark.conf.app_name)).setMaster(Conf.getString(spark.conf.master)) var sc = new SparkContext(conf) } spark.conf.master is yarn-cluster in my application.conf, but I think spark-submit will override the master mode, right? Jianshi On Wed, Jun 18, 2014 at 12:37 AM, Andrew Or and...@databricks.com wrote: Standalone-client mode is not officially supported at the moment. For standalone-cluster and yarn-client modes, however, they should work. For both modes, are you running spark-submit from within the cluster, or outside of it? If the latter, could you try running it from within the cluster and see if it works? (Does your rtgraph.jar exist on the machine from which you run spark-submit?) 2014-06-17 2:41 GMT-07:00 Jianshi Huang jianshi.hu...@gmail.com: Hi, I've stuck using either yarn-client or standalone-client mode. Either will stuck when I submit jobs, the last messages it printed were: ... 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR file:/x/home/jianshuang/tmp/rtgraph.jar at http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@6655cf60 I can use yarn-cluster to run my app but it's not very convenient to monitor the progress. Standalone-cluster mode doesn't work, it reports file not found error: Driver successfully submitted as driver-20140617023956-0003 ... waiting before polling master for driver state ... polling master for driver state State of driver-20140617023956-0003 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/x/home/jianshuang/tmp/rtgraph.jar does not exist I'm using Spark 1.0.0 and my submit command looks like this: ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph' --class com.paypal.rtgraph.demo.MapReduceWriter --master spark:// lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','` --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster rtgraph.jar List of jars I put in --jars option are: accumulo-core.jar accumulo-fate.jar accumulo-minicluster.jar accumulo-trace.jar accumulo-tracer.jar chill_2.10-0.3.6.jar commons-math.jar commons-vfs2.jar config-1.2.1.jar gson.jar guava.jar joda-convert-1.2.jar joda-time-2.3.jar kryo-2.21.jar libthrift.jar quasiquotes_2.10-2.0.0-M8.jar scala-async_2.10-0.9.1.jar scala-library-2.10.4.jar scala-reflect-2.10.4.jar Anyone has hint what went wrong? Really confused. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Execution stalls in LogisticRegressionWithSGD
Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL
Re: spark with docker: errors with akka, NAT?
I used --privileged to start the container and then unmounted /etc/hosts. Then I created a new /etc/hosts file On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com wrote: I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote: It was a DNS issue. AKKA apparently uses the hostname of the endpoints and hence they need to be resolvable. In my case the hostname of the docker container was a randomly generated string and was not resolvable. I added a workaround (entry in etc/hosts file of spark master) for now. If anyone can point to a more elegant solution, that would be awesome! On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com wrote: I am using cutting edge code from git but doing my own sbt assembly. On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: Hi, are you using the amplab/spark-1.0.0 images from the global registry? Andre On 06/17/2014 01:36 AM, Mohit Jaggi wrote: Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Xiangrui, (2014/06/18 8:49), Xiangrui Meng wrote: Makoto, dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+. That may be the problem. It seems that it could cuase certain problems for a convex optimization of large training data and a merging tree, like allreduce, would help to reduce memory requirements (though time for aggregation might increase). Which deploy mode are you using, standalone or local? Standalone. Setting -driver-memory 8G was not solved the aggregate problem. Aggregation never finishes. `ps aux | grep spark` on master is as follows: myui 7049 79.3 1.1 8768868 592348 pts/2 Sl+ 11:10 0:14 /usr/java/jdk1.7/bin/java -cp ::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf -XX:MaxPermSize=128m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Djava.library.path= -Xms2g -Xmx2g org.apache.spark.deploy.SparkSubmit spark-shell --driver-memory 8G --class org.apache.spark.repl.Main myui 5694 2.5 0.5 6868296 292572 pts/2 Sl 10:59 0:17 /usr/java/jdk1.7/bin/java -cp ::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 --webui-port 8081 Exporting SPARK_DAEMON_MEMORY=4g in spark-env.sh did not take effect for the evaluation. `ps aux | grep spark` /usr/java/jdk1.7/bin/java -cp ::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 --webui-port 8081 ... Thanks, Makoto
Re: spark with docker: errors with akka, NAT?
Yup, alright, same solution then :) On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi mohitja...@gmail.com wrote: I used --privileged to start the container and then unmounted /etc/hosts. Then I created a new /etc/hosts file On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com wrote: I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote: It was a DNS issue. AKKA apparently uses the hostname of the endpoints and hence they need to be resolvable. In my case the hostname of the docker container was a randomly generated string and was not resolvable. I added a workaround (entry in etc/hosts file of spark master) for now. If anyone can point to a more elegant solution, that would be awesome! On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com wrote: I am using cutting edge code from git but doing my own sbt assembly. On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: Hi, are you using the amplab/spark-1.0.0 images from the global registry? Andre On 06/17/2014 01:36 AM, Mohit Jaggi wrote: Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.
Re: Enormous EC2 price jump makes r3.large patch more important
Hey Jeremy, This is patched in the 1.0 and 0.9 branches of Spark. We're likely to make a 1.0.1 release soon (this patch being one of the main reasons), but if you are itching for this sooner, you can just checkout the head of branch-1.0 and you will be able to use r3.XXX instances. - Patrick On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Some people (me included) might have wondered why all our m1.large spot instances (in us-west-1) shut down a few hours ago... Simple reason: The EC2 spot price for Spark's default m1.large instances just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably something to do with world cup. So far this is just us-west-1, but prices have a tendency to equalize across centers as the days pass. Time to make backups and plans. m3 spot prices are still down at $0.02 (and being new, will be bypassed by older systems), so it would be REAAALLYY nice if there had been some progress on that issue. Let me know if I can help with testing and whatnot. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
rdd.cache() is not faster?
Hi, I have a 40G file which is a concatenation of multiple documents, I want to extract two features (title and tables) from each doc, so the program is like this: - val file = sc.textFile(/path/to/40G/file) //file.cache() //to enable or disable cache val titles = file.map(line = (doc_key, getTitle()) // reduce 1; here I use text utility functions written in Java { }).reduceByKey(_ + _,1) val tables = file.flatMap(line = { for (table - all_tables) yield (doc_key, getTableTitle()) // reduce 2; here I use text utility functions written in Java }).reduceByKey(_ + _,1) titles.saveAsTextFile(titles.out) //save_1, will trigger reduce_1 tables.saveAsTextFile(tables.out) //save_2, will trigger reduce_2 - I expect that with file.cache(), (the later) reduce_2 should be faster since it will read from cached data. However, results repeatedly shows that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why reading from cache does not help in this case? Stage GUI shows that, with cache, reduce_2 always has a wave of outlier tasks, where the median latency is 2s but max is 1.7 min. Metric Min 25th percentile Median 75th percentile Max Result serialization time 0 ms 0 ms 0 ms 0 ms 1 ms Duration 0.6 s 2 s 2 s 2 s 1.7 min But these tasks are not with a long GC pause (26 ms as shown) 173 1210 SUCCESS PROCESS_LOCAL localhost 2014/06/17 17:49:43 1.7 min 26 ms 9.4 KB BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these lines in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g SPARK_JAVA_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:MaxPermSize=256m Thanks, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: Enormous EC2 price jump makes r3.large patch more important
By the way, in case it's not clear, I mean our maintenance branches: https://github.com/apache/spark/tree/branch-1.0 On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jeremy, This is patched in the 1.0 and 0.9 branches of Spark. We're likely to make a 1.0.1 release soon (this patch being one of the main reasons), but if you are itching for this sooner, you can just checkout the head of branch-1.0 and you will be able to use r3.XXX instances. - Patrick On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Some people (me included) might have wondered why all our m1.large spot instances (in us-west-1) shut down a few hours ago... Simple reason: The EC2 spot price for Spark's default m1.large instances just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably something to do with world cup. So far this is just us-west-1, but prices have a tendency to equalize across centers as the days pass. Time to make backups and plans. m3 spot prices are still down at $0.02 (and being new, will be bypassed by older systems), so it would be REAAALLYY nice if there had been some progress on that issue. Let me know if I can help with testing and whatnot. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Wildcard support in input path
It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Wildcard support in input path
Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
question about setting SPARK_CLASSPATH IN spark_env.sh
Hi, This is about spark 0.9. I have a 3 node spark cluster. I want to add a locally available jarfile (present on all nodes) to the SPARK_CLASPATH variable in /etc/spark/conf/spark-env.sh so that all nodes can access it. Question is, should I edit 'spark-env.sh' on all nodes to add the jar ? Or, is it enough to add it only in the master node from where I am submitting jobs? thanks Santhosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Enormous EC2 price jump makes r3.large patch more important
I am about to spin up some new clusters, so I may give that a go... any special instructions for making them work? I assume I use the --spark-git-repo= option on the spark-ec2 command. Is it as easy as concatenating your string as the value? On cluster management GUIs... I've been looking around at Amabari, Datastax, Cloudera, OpsCenter etc. Not totally convinced by any of them yet. Anyone using a good one I should know about? I'm really beginning to lean in the direction of Cassandra as the distributed data store... On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell pwend...@gmail.com wrote: By the way, in case it's not clear, I mean our maintenance branches: https://github.com/apache/spark/tree/branch-1.0 On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jeremy, This is patched in the 1.0 and 0.9 branches of Spark. We're likely to make a 1.0.1 release soon (this patch being one of the main reasons), but if you are itching for this sooner, you can just checkout the head of branch-1.0 and you will be able to use r3.XXX instances. - Patrick On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Some people (me included) might have wondered why all our m1.large spot instances (in us-west-1) shut down a few hours ago... Simple reason: The EC2 spot price for Spark's default m1.large instances just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably something to do with world cup. So far this is just us-west-1, but prices have a tendency to equalize across centers as the days pass. Time to make backups and plans. m3 spot prices are still down at $0.02 (and being new, will be bypassed by older systems), so it would be REAAALLYY nice if there had been some progress on that issue. Let me know if I can help with testing and whatnot. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Enormous EC2 price jump makes r3.large patch more important
Actually you'll just want to clone the 1.0 branch then use the spark-ec2 script in there to launch your cluster. The --spark-git-repo flag is if you want to launch with a different version of Spark on the cluster. In your case you just need a different version of the launch script itself, which will be present in the 1.0 branch of Spark. - Patrick On Tue, Jun 17, 2014 at 9:29 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: I am about to spin up some new clusters, so I may give that a go... any special instructions for making them work? I assume I use the --spark-git-repo= option on the spark-ec2 command. Is it as easy as concatenating your string as the value? On cluster management GUIs... I've been looking around at Amabari, Datastax, Cloudera, OpsCenter etc. Not totally convinced by any of them yet. Anyone using a good one I should know about? I'm really beginning to lean in the direction of Cassandra as the distributed data store... On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell pwend...@gmail.com wrote: By the way, in case it's not clear, I mean our maintenance branches: https://github.com/apache/spark/tree/branch-1.0 On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jeremy, This is patched in the 1.0 and 0.9 branches of Spark. We're likely to make a 1.0.1 release soon (this patch being one of the main reasons), but if you are itching for this sooner, you can just checkout the head of branch-1.0 and you will be able to use r3.XXX instances. - Patrick On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Some people (me included) might have wondered why all our m1.large spot instances (in us-west-1) shut down a few hours ago... Simple reason: The EC2 spot price for Spark's default m1.large instances just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably something to do with world cup. So far this is just us-west-1, but prices have a tendency to equalize across centers as the days pass. Time to make backups and plans. m3 spot prices are still down at $0.02 (and being new, will be bypassed by older systems), so it would be REAAALLYY nice if there had been some progress on that issue. Let me know if I can help with testing and whatnot. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Wildcard support in input path
These paths get passed directly to the Hadoop FileSystem API and I think the support globbing out-of-the box. So AFAIK it should just work. On Tue, Jun 17, 2014 at 9:09 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Wildcard support in input path
In Spark you can use the normal globs supported by Hadoop's FileSystem, which are documented here: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path) On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark SQL: No function to evaluate expression
Yeah, sorry that error message is not very intuitive. There is already a JIRA open to make it better: SPARK-2059 https://issues.apache.org/jira/browse/SPARK-2059 Also, a bug has been fixed in master regarding attributes that contain _. So if you are running 1.0 you might try upgrading. On Wed, Jun 18, 2014 at 4:05 AM, Tobias Pfeiffer t...@preferred.jp wrote: The error message *means* that there is no column called c_address. However, maybe it's a bug with Spark SQL not understanding the a.c_address syntax. Can you double-check the column name is correct? Thanks Tobias On Wed, Jun 18, 2014 at 5:02 AM, Zuhair Khayyat zuhair.khay...@gmail.com wrote: Dear all, I am trying to run the following query on Spark SQL using some custom TPC-H tables with standalone Spark cluster configuration: SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE a.c_address b.c_address; Unfortunately I get the following error during execution: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure in TID 12 on host kw2260.kaust.edu.sa: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59) org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147) org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74) org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100) Is this a bug or am I doing something wrong? Regards, Zuhair Khayyat
Re: Un-serializable 3rd-party classes (Spark, Java)
There are a few options: - Kryo might be able to serialize these objects out of the box, depending what’s inside them. Try turning it on as described at http://spark.apache.org/docs/latest/tuning.html. - If that doesn’t work, you can create your own “wrapper” objects that implement Serializable, or even a subclass of FlexCompRowMatrix. No need to change the original library. - If the library has its own serialization functions, you could also use those inside a wrapper object. Take a look at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala for an example where we make Hadoop’s Writables serializable. Matei On Jun 17, 2014, at 10:11 PM, Daedalus tushar.nagara...@gmail.com wrote: I'm trying to use matrix-toolkit-java https://github.com/fommil/matrix-toolkits-java/ for an application of mine, particularly ,the FlexCompRowMatrix class (used to store sparse matrices). I have a class Dataframe -- which contains and int array, two double values, and one FlexCompRowMatrix. When I try and run a simple Spark .foreach() on a JavaRDD created using a list of the above mentioned Dataframes, I get the following errors: Exception in thread main org.apache.spark.SparkException: Job aborted due to s tage failure:* Task not serializable: java.io.NotSerializableException: no.uib.ci pr.matrix.sparse.FlexCompRowMatrix* at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) The FlexCompRowMatrix doesn't seem to implement Serializable. This class suits my purpose very well, and I would prefer not to switch over from it. Other than writing code to make the class serializable, and then recompiling the matrix-toolkit-java source, what options do I have? Is there any workaround for this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-tp7815.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Java IO Stream Corrupted - Invalid Type AC?
Out of curiosity - are you guys using speculation, shuffle consolidation, or any other non-default option? If so that would help narrow down what's causing this corruption. On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matt/Ryan, Did you make any headway on this? My team is running into this also. Doesn't happen on smaller datasets. Our input set is about 10 GB but we generate 100s of GBs in the flow itself. -Suren On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com wrote: Just ran into this today myself. I'm on branch-1.0 using a CDH3 cluster (no modifications to Spark or its dependencies). The error appeared trying to run GraphX's .connectedComponents() on a ~200GB edge list (GraphX worked beautifully on smaller data). Here's the stacktrace (it's quite similar to yours https://imgur.com/7iBA4nJ ). 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed 4 times; aborting job 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at VertexRDD.scala:100 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.599:39 failed 4 times, most recent failure: Exception failure in TID 29735 on host node18: java.io.StreamCorruptedException: invalid type code: AC java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192) org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at
Re: Issue while trying to aggregate with a sliding window
Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch