Re: Is it safe to use Scala 2.11 for Spark build?
Ok, I'll wait until -Pscala-2.11 is more stable and used by more people. Thanks for the help! Jianshi On Tue, Nov 18, 2014 at 3:49 PM, Ye Xianjin advance...@gmail.com wrote: Hi Prashant Sharma, It's not even ok to build with scala-2.11 profile on my machine. Just check out the master(c6e0c2ab1c29c184a9302d23ad75e4ccd8060242) run sbt/sbt -Pscala-2.11 clean assembly: .. skip the normal part info] Resolving org.scalamacros#quasiquotes_2.11;2.0.1 ... [warn] module not found: org.scalamacros#quasiquotes_2.11;2.0.1 [warn] local: tried [warn] /Users/yexianjin/.ivy2/local/org.scalamacros/quasiquotes_2.11/2.0.1/ivys/ivy.xml [warn] public: tried [warn] https://repo1.maven.org/maven2/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] central: tried [warn] https://repo1.maven.org/maven2/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] apache-repo: tried [warn] https://repository.apache.org/content/repositories/releases/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] jboss-repo: tried [warn] https://repository.jboss.org/nexus/content/repositories/releases/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] mqtt-repo: tried [warn] https://repo.eclipse.org/content/repositories/paho-releases/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] cloudera-repo: tried [warn] https://repository.cloudera.com/artifactory/cloudera-repos/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] mapr-repo: tried [warn] http://repository.mapr.com/maven/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] spring-releases: tried [warn] https://repo.spring.io/libs-release/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] spark-staging: tried [warn] https://oss.sonatype.org/content/repositories/orgspark-project-1085/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] spark-staging-hive13: tried [warn] https://oss.sonatype.org/content/repositories/orgspark-project-1089/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] apache.snapshots: tried [warn] http://repository.apache.org/snapshots/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [warn] Maven2 Local: tried [warn] file:/Users/yexianjin/.m2/repository/org/scalamacros/quasiquotes_2.11/2.0.1/quasiquotes_2.11-2.0.1.pom [info] Resolving jline#jline;2.12 ... [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.scalamacros#quasiquotes_2.11;2.0.1: not found [warn] :: [info] Resolving org.scala-lang#scala-library;2.11.2 ... [warn] [warn] Note: Unresolved dependencies path: [warn] org.scalamacros:quasiquotes_2.11:2.0.1 ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76) [warn] +- org.apache.spark:spark-catalyst_2.11:1.2.0-SNAPSHOT [info] Resolving jline#jline;2.12 ... [info] Done updating. [info] Updating {file:/Users/yexianjin/spark/}streaming-twitter... [info] Updating {file:/Users/yexianjin/spark/}streaming-zeromq... [info] Updating {file:/Users/yexianjin/spark/}streaming-flume... [info] Updating {file:/Users/yexianjin/spark/}streaming-mqtt... [info] Resolving jline#jline;2.12 ... [info] Done updating. [info] Resolving com.esotericsoftware.minlog#minlog;1.2 ... [info] Updating {file:/Users/yexianjin/spark/}streaming-kafka... [info] Resolving jline#jline;2.12 ... [info] Done updating. [info] Resolving jline#jline;2.12 ... [info] Done updating. [info] Resolving jline#jline;2.12 ... [info] Done updating. [info] Resolving org.apache.kafka#kafka_2.11;0.8.0 ... [warn] module not found: org.apache.kafka#kafka_2.11;0.8.0 [warn] local: tried [warn] /Users/yexianjin/.ivy2/local/org.apache.kafka/kafka_2.11/0.8.0/ivys/ivy.xml [warn] public: tried [warn] https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.11/0.8.0/kafka_2.11-0.8.0.pom [warn] central: tried [warn] https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.11/0.8.0/kafka_2.11-0.8.0.pom [warn] apache-repo: tried [warn] https://repository.apache.org/content/repositories/releases/org/apache/kafka/kafka_2.11/0.8.0/kafka_2.11-0.8.0.pom [warn] jboss-repo: tried [warn] https://repository.jboss.org/nexus/content/repositories/releases/org/apache/kafka/kafka_2.11/0.8.0/kafka_2.11-0.8.0.pom [warn] mqtt-repo: tried [warn] https://repo.eclipse.org/content/repositories/paho-releases/org/apache/kafka/kafka_2.11/0.8.0/kafka_2.11-0.8.0.pom [warn] cloudera-repo: tried [warn] https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kafka/kafka_2.11/0.8.0/kafka_2.11-0.8.0.pom [warn] mapr-repo: tried [warn]
Logging problem in Spark when using Flume Log4jAppender
Hi, I want to do log aggregation in a Spark standalone mode cluster, using Apache Flume. But something weird happended. Here are my operations: (1) Start a flume agent, listening on port 3. ( flume-conf.properties http://apache-spark-user-list.1001560.n3.nabble.com/file/n19140/flume-conf.properties ) (2) In log4j.properties, I added a Flume Log4jAppender to make Spark able to send its log to a flume agent. ( log4j.properties http://apache-spark-user-list.1001560.n3.nabble.com/file/n19140/log4j.properties ) (3) sbin/start-all.sh. In this stage, all logs were aggregated into the flume agent and no exception happened. (flume-ng-log4jappender version: 1.5.0.1) (4) bin/spark-submit --class org.apache.examples.SparkPi ../lib/spark-examples-1.1.0-hadoop2.4.0.jar 1 At the beginning, logs were aggregated normally. However, when the driver need to stop SparkContext sc, it threw some exceptions and the driver exit abnormally: sparkPi.log http://apache-spark-user-list.1001560.n3.nabble.com/file/n19140/sparkPi.log / ... 14/11/18 23:06:24 INFO SparkDeploySchedulerBackend:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - Asking each executor to shut down 14/11/18 23:06:25 INFO MapOutputTrackerMasterActor:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - MapOutputTrackerActor stopped! 14/11/18 23:06:25 INFO ConnectionManager:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - Selector thread was interrupted! log4j:ERROR Flume append() failed. 14/11/18 23:06:25 ERROR ConnectionManager:org.apache.spark.Logging$class.logError(Logging.scala:96) - Error in select loop org.apache.flume.FlumeException: Flume append() failed. Exception follows. ... / I also tried setting log4j.appender.flume.unsafeMode property to true. This time, the drive finished the job but didn't exit: sparkPi_snd.log http://apache-spark-user-list.1001560.n3.nabble.com/file/n19140/sparkPi_snd.log / 14/11/18 23:36:49 WARN NettyAvroRpcClient:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - Using default maxIOWorkers log4j:ERROR Cannot Append to Appender! Appender either closed or not setup correctly! 14/11/18 23:36:49 INFO ConnectionManager:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - Removing ReceivingConnection to ConnectionManagerId(pc215,59272) 14/11/18 23:36:49 ERROR ConnectionManager:org.apache.spark.Logging$class.logError(Logging.scala:75) - Corresponding SendingConnection to ConnectionManagerId(pc215,59272) not found 14/11/18 23:36:49 WARN ConnectionManager:org.apache.spark.Logging$class.logWarning(Logging.scala:71) - All connections not cleaned up 14/11/18 23:36:49 INFO ConnectionManager:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - ConnectionManager stopped 14/11/18 23:36:49 INFO MemoryStore:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - MemoryStore cleared 14/11/18 23:36:49 INFO BlockManager:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - BlockManager stopped 14/11/18 23:36:49 INFO BlockManagerMaster:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - BlockManagerMaster stopped 14/11/18 23:36:49 INFO RemoteActorRefProvider$RemotingTerminator:akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3.apply$mcV$sp(Slf4jLogger.scala:74) - Shutting down remote daemon. 14/11/18 23:36:49 INFO RemoteActorRefProvider$RemotingTerminator:akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3.apply$mcV$sp(Slf4jLogger.scala:74) - Remote daemon shut down; proceeding with flushing remote transports. 14/11/18 23:36:49 INFO Remoting:akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3.apply$mcV$sp(Slf4jLogger.scala:74) - Remoting shut down 14/11/18 23:36:49 INFO SparkContext:org.apache.spark.Logging$class.logInfo(Logging.scala:59) - Successfully stopped SparkContext / I have searched Google with keywords like Spark log aggregation, Spark Flume Log4jAppender, ERROR Flume append() failed, but I can't figure out how to solve this problem. Does anybody has an idea? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Logging-problem-in-Spark-when-using-Flume-Log4jAppender-tp19140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to know the Spark worker Mechanism
I'm a newbee in Spark. I know that what the work should do is written in RDD. But I want to make the worker load a native lib and I can do something to change the content of the lib in memory. So how can I do. I can do it on driver, but not worker. I always get a fatal error. The jvm report A fatal error C [libstdc++.so.6+0x64d24] std::_Rb_tree_rotate_left(std::_Rb_tree_node_base*, std::_Rb_tree_node_base*)+0x4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-know-the-Spark-worker-Mechanism-tp19141.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark On Yarn Issue: Initial job has not accepted any resources
Not sure how to solve this, but spotted these lines in the logs: 14/11/18 14:28:23 INFO YarnAllocationHandler: Container marked as *failed*: container_1415961020140_0325_01_02 14/11/18 14:28:38 INFO YarnAllocationHandler: Container marked as *failed*: container_1415961020140_0325_01_03 And the lines following it says its trying to allocate some space of 1408B but its failing to do so. You might want to look into that On Tue, Nov 18, 2014 at 1:23 PM, LinCharlie lin_q...@outlook.com wrote: Hi All: I was submitting a spark_program.jar to `spark on yarn cluster` on a driver machine with yarn-client mode. Here is the spark-submit command I used: ./spark-submit --master yarn-client --class com.charlie.spark.grax.OldFollowersExample --queue dt_spark ~/script/spark-flume-test-0.1-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.1.jar The queue `dt_spark` was free, and the program was submitted succesfully and running on the cluster. But on console, it showed repeatedly that: 14/11/18 15:11:48 WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Checked the cluster UI logs, I find no errors: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/disk5/yarn/usercache/linqili/filecache/6957209742046754908/spark-assembly-1.0.2-hadoop2.0.0-cdh4.2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-2.0.0-cdh4.2.1/share/hadoop/common/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/11/18 14:28:16 INFO SecurityManager: Changing view acls to: hadoop,linqili 14/11/18 14:28:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop, linqili) 14/11/18 14:28:17 INFO Slf4jLogger: Slf4jLogger started 14/11/18 14:28:17 INFO Remoting: Starting remoting 14/11/18 14:28:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkyar...@longzhou-hdp3.lz.dscc:37187] 14/11/18 14:28:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkyar...@longzhou-hdp3.lz.dscc:37187] 14/11/18 14:28:17 INFO ExecutorLauncher: ApplicationAttemptId: appattempt_1415961020140_0325_01 14/11/18 14:28:17 INFO ExecutorLauncher: Connecting to ResourceManager at longzhou-hdpnn.lz.dscc/192.168.19.107:12032 14/11/18 14:28:17 INFO ExecutorLauncher: Registering the ApplicationMaster 14/11/18 14:28:18 INFO ExecutorLauncher: Waiting for spark driver to be reachable. 14/11/18 14:28:18 INFO ExecutorLauncher: Master now available: 192.168.59.90:36691 14/11/18 14:28:18 INFO ExecutorLauncher: Listen to driver: akka.tcp://spark@192.168.59.90:36691/user/CoarseGrainedScheduler 14/11/18 http://spark@192.168.59.90:36691/user/CoarseGrainedScheduler14/11/18 14:28:18 INFO ExecutorLauncher: Allocating 1 executors. 14/11/18 14:28:18 INFO YarnAllocationHandler: Allocating 1 executor containers with 1408 of memory each. 14/11/18 14:28:18 INFO YarnAllocationHandler: ResourceRequest (host : *, num containers: 1, priority = 1 , capability : memory: 1408) 14/11/18 14:28:18 INFO YarnAllocationHandler: Allocating 1 executor containers with 1408 of memory each. 14/11/18 14:28:18 INFO YarnAllocationHandler: ResourceRequest (host : *, num containers: 1, priority = 1 , capability : memory: 1408) 14/11/18 14:28:18 INFO RackResolver: Resolved longzhou-hdp3.lz.dscc to /rack1 14/11/18 14:28:18 INFO YarnAllocationHandler: launching container on container_1415961020140_0325_01_02 host longzhou-hdp3.lz.dscc 14/11/18 14:28:18 INFO ExecutorRunnable: Starting Executor Container 14/11/18 14:28:18 INFO ExecutorRunnable: Connecting to ContainerManager at longzhou-hdp3.lz.dscc:12040 14/11/18 14:28:18 INFO ExecutorRunnable: Setting up ContainerLaunchContext 14/11/18 14:28:18 INFO ExecutorRunnable: Preparing Local resources 14/11/18 14:28:18 INFO ExecutorLauncher: All executors have launched. 14/11/18 14:28:18 INFO ExecutorLauncher: Started progress reporter thread - sleep time : 5000 14/11/18 14:28:18 INFO YarnAllocationHandler: ResourceRequest (host : *, num containers: 0, priority = 1 , capability : memory: 1408) 14/11/18 14:28:18 INFO ExecutorRunnable: Prepared Local resources Map(__spark__.jar - resource {, scheme: hdfs, host: longzhou-hdpnn.lz.dscc, port: 11000, file: /user/linqili/.sparkStaging/application_1415961020140_0325/spark-assembly-1.0.2-hadoop2.0.0-cdh4.2.1.jar, }, size: 134859131, timestamp: 1416292093988, type: FILE, visibility: PRIVATE, ) 14/11/18 14:28:18 INFO ExecutorRunnable: Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m ,
Slave Node Management in Standalone Cluster
Hi, I'm operating Spark in standalone cluster configuration (3 slaves) and have some question. 1. How can I stop a slave on the specific node? Under `sbin/` directory, there are `start-{all,master,slave,slaves}` and `stop-{all,master,slaves}`, but no `stop-slave`. Are there any way to stop the specific (e.g., the 2nd) slave via command line? 2. How can I check cluster status from command line? Are there any way to confirm that all Master / Workers are up and working without using Web UI? Thanks in advance! -- Kenichi Maehashi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to know the Spark worker Mechanism
Did you set spark.executor.extraLibraryPath to the directory which your native library exists? 2014-11-18 16:13 GMT+08:00 tangweihan tangwei...@360.cn: I'm a newbee in Spark. I know that what the work should do is written in RDD. But I want to make the worker load a native lib and I can do something to change the content of the lib in memory. So how can I do. I can do it on driver, but not worker. I always get a fatal error. The jvm report A fatal error C [libstdc++.so.6+0x64d24] std::_Rb_tree_rotate_left(std::_Rb_tree_node_base*, std::_Rb_tree_node_base*)+0x4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-know-the-Spark-worker-Mechanism-tp19141.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark/ Spark Streaming] Spark 1.1.0 fails working with akka 2.3.6
Hi, I have created a spark streaming application based on spark-1.1.0. While running it failed saying akk jar version mismatch. Some projects are using akka 2.3.6 so I have no choice to change the akka version as it will affect others. What should I do? *Caused by: akka.ConfigurationException: Akka JAR version [2.3.6] does not match the provided config version [2.2.3]* * at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:209) ~[analytics-engine.jar:1.0.0]* * at akka.actor.ActorSystemImpl.init(ActorSystem.scala:504) ~[analytics-engine.jar:1.0.0]* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) ~[analytics-engine.jar:1.0.0]* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) ~[analytics-engine.jar:1.0.0]* * at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.SparkContext.init(SparkContext.scala:203) ~[analytics-engine.jar:1.0.0]* * at com.livestream.analytics.engine.context.ApplicationContextFactory$.createSimpleContext(ApplicationContext.scala:63) ~[analytics-engine.jar:1.0.0]* * ... 13 common frames omitted* Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: how to know the Spark worker Mechanism
Ok. I don't put it in the path. Because this is not a lib I want to use permanently. here is my code in RDD. val fileaddr = SparkFiles.get(segment.so); System.load(fileaddr); val config = SparkFiles.get(qsegconf.ini) val segment = new Segment//this is the native class segment.init(config);//here failed if driver doesn't load this lib I just use the system.load to load this lib. But now I also call some functions in the lib to change some objects in the lib. It turns out the fatal error. And after I first load it in driver, this works again in standalone mode. I want to know how the job running from the driver to worker, or how the worker memory is loaded to the native lib. Then I take another sample lib. If the function not change the objects in the lib, the lib can be only loaded in workers. And another thing I want to know is whether there is something like the cache arche in hadoop streaming. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-know-the-Spark-worker-Mechanism-tp19141p19146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: inconsistent edge counts in GraphX
At 2014-11-11 01:51:43 +, Buttler, David buttl...@llnl.gov wrote: I am building a graph from a large CSV file. Each record contains a couple of nodes and about 10 edges. When I try to load a large portion of the graph, using multiple partitions, I get inconsistent results in the number of edges between different runs. However, if I use a single partition, or a small portion of the CSV file (say 1000 rows), then I get a consistent number of edges. Is there anything I should be aware of as to why this could be happening in GraphX? Is it possible there's some nondeterminism in the way you're reading the file? It would be helpful if you could post the code you're using to load the graph. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Slave Node Management in Standalone Cluster
1. You can comment the rest of the workers from the conf/slaves file and do a stop-slaves.sh from that machine to stop the specific worker. 2. There is no direct command for it, but you can do something like the following: $ curl localhost:8080 | grep Applications -C 10 | head -n20 Where localhost is your master machine and 8080 is the web ui port. You can also look at the metrics http://spark.apache.org/docs/latest/monitoring.html#metrics options for more sophisticated version. Thanks Best Regards On Tue, Nov 18, 2014 at 1:57 PM, Kenichi Maehashi webmas...@kenichimaehashi.com wrote: Hi, I'm operating Spark in standalone cluster configuration (3 slaves) and have some question. 1. How can I stop a slave on the specific node? Under `sbin/` directory, there are `start-{all,master,slave,slaves}` and `stop-{all,master,slaves}`, but no `stop-slave`. Are there any way to stop the specific (e.g., the 2nd) slave via command line? 2. How can I check cluster status from command line? Are there any way to confirm that all Master / Workers are up and working without using Web UI? Thanks in advance! -- Kenichi Maehashi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX / PageRank with edge weights
At 2014-11-13 21:28:52 +, Ommen, Jurgen omme0...@stthomas.edu wrote: I'm using GraphX and playing around with its PageRank algorithm. However, I can't see from the documentation how to use edge weight when running PageRank. Is this possible to consider edge weights and how would I do it? There's no built-in way to prefer certain edges over others; edge weights are just set to the inverse of the outdegree of the source vertex. But it's simple to modify the PageRank code [1] to use custom edge weights instead: (1) copy the PageRank.run method body to your own project, (2) change the type signature of the input graph from Graph[VD, ED] to Graph[VD, Double], and (3) remove the calls to outerJoinVertices and mapTriplets on line 86 and 88. Ankur [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala#L79 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Null pointer exception with larger datasets
Thanks Akhil. -Naveen. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, November 18, 2014 1:19 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org Subject: Re: Null pointer exception with larger datasets Make sure your list is not null, if that is null then its more like doing: JavaRDDStudent distData = sc.parallelize(null) distData.foreach(println) Thanks Best Regards On Tue, Nov 18, 2014 at 12:07 PM, Naveen Kumar Pokala npok...@spcapitaliq.commailto:npok...@spcapitaliq.com wrote: Hi, I am having list Students and size is one Lakh and I am trying to save the file. It is throwing null pointer exception. JavaRDDStudent distData = sc.parallelize(list); distData.saveAsTextFile(hdfs://master/data/spark/instruments.txt); 14/11/18 01:33:21 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, master): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) How to handle this? -Naveen
Re: Pagerank implementation
At 2014-11-15 18:01:22 -0700, tom85 tom.manha...@gmail.com wrote: This line: val newPR = oldPR + (1.0 - resetProb) * msgSum makes no sense to me. Should it not be: val newPR = resetProb/graph.vertices.count() + (1.0 - resetProb) * msgSum ? This is an unusual version of PageRank where the messages being passed around are deltas rather than full ranks. This occurs at line 156 in vertexProgram, which returns (newPR, newPR - oldPR). The second element of the tuple is the delta, which is subsequently used in sendMessage. The benefit of this is that sendMessage can avoid sending when the delta drops below the convergence threshold `tol`, indicating that the source vertex has converged. But it means that to update the rank of each vertex, we have to add the incoming delta to its existing rank. That's why the oldPR term appears in the line you're looking at. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
New Codes in GraphX
Hi, I am using Spark-1.0.0. There are two GraphX directories that I can see here 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx which contains LiveJournalPageRank,scala 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which contains Analytics.scala, ConnectedComponenets.scala etc etc Now, if I want to add my own code to GraphX i.e., if I want to write a small application on GraphX, in which directory should I add my code, in 1 or 2 ? And what is the difference? Can anyone tell me something on this? Thank You
Re: Landmarks in GraphX section of Spark API
At 2014-11-17 14:47:50 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I was going through the graphx section in the Spark API in https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.ShortestPaths$ Here, I find the word landmark. Can anyone explain to me what is landmark means. Is it a simple English word or does it mean something else in graphx. The landmarks in the context of the shortest-paths algorithm are just the vertices of interest. For each vertex in the graph, the algorithm will return the distance to each of the landmark vertices. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
So landmark can contain just one vertex right? Which algorithm has been used to compute the shortest path? Thank You On Tue, Nov 18, 2014 at 2:53 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-11-17 14:47:50 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I was going through the graphx section in the Spark API in https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.ShortestPaths$ Here, I find the word landmark. Can anyone explain to me what is landmark means. Is it a simple English word or does it mean something else in graphx. The landmarks in the context of the shortest-paths algorithm are just the vertices of interest. For each vertex in the graph, the algorithm will return the distance to each of the landmark vertices. Ankur
Re: Running PageRank in GraphX
At 2014-11-18 12:02:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I just ran the PageRank code in GraphX with some sample data. What I am seeing is that the total rank changes drastically if I change the number of iterations from 10 to 100. Why is that so? As far as I understand, the total rank should asymptotically approach the number of vertices in the graph, assuming there are no vertices of zero outdegree. Does that seem to be the case for your graph? Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running PageRank in GraphX
There are no vertices of zero outdegree. The total rank for the graph with numIter = 10 is 4.99 and for the graph with numIter = 100 is 5.99 I do not know why so much variation. On Tue, Nov 18, 2014 at 3:22 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-11-18 12:02:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I just ran the PageRank code in GraphX with some sample data. What I am seeing is that the total rank changes drastically if I change the number of iterations from 10 to 100. Why is that so? As far as I understand, the total rank should asymptotically approach the number of vertices in the graph, assuming there are no vertices of zero outdegree. Does that seem to be the case for your graph? Ankur
Re: Landmarks in GraphX section of Spark API
At 2014-11-18 14:59:20 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: So landmark can contain just one vertex right? Right. Which algorithm has been used to compute the shortest path? It's distributed Bellman-Ford. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
Does Bellman-Ford give the best solution? On Tue, Nov 18, 2014 at 3:27 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-11-18 14:59:20 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: So landmark can contain just one vertex right? Right. Which algorithm has been used to compute the shortest path? It's distributed Bellman-Ford. Ankur
Re: New Codes in GraphX
The codes that are present in 2 can be run with the command *$SPARK_HOME/bin/spark-submit --master local[*] --class org.apache.spark.graphx.lib.Analytics $SPARK_HOME/assembly/target/scala-2.10/spark-assembly-*.jar pagerank /edge-list-file.txt --numEPart=8 --numIter=10 --partStrategy=EdgePartition2D* Now, how do I run the LiveJournalPageRank.scala that is there in 1? On Tue, Nov 18, 2014 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am using Spark-1.0.0. There are two GraphX directories that I can see here 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx which contains LiveJournalPageRank,scala 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which contains Analytics.scala, ConnectedComponenets.scala etc etc Now, if I want to add my own code to GraphX i.e., if I want to write a small application on GraphX, in which directory should I add my code, in 1 or 2 ? And what is the difference? Can anyone tell me something on this? Thank You
Re: New Codes in GraphX
At 2014-11-18 14:51:54 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I am using Spark-1.0.0. There are two GraphX directories that I can see here 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx which contains LiveJournalPageRank,scala 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which contains Analytics.scala, ConnectedComponenets.scala etc etc Now, if I want to add my own code to GraphX i.e., if I want to write a small application on GraphX, in which directory should I add my code, in 1 or 2 ? And what is the difference? If you want to add an algorithm which you can call from the Spark shell and submit as a pull request, you should add it to org.apache.spark.graphx.lib (#2). To run it from the command line, you'll also have to modify Analytics.scala. If you want to write a separate application, the ideal way is to do it in a separate project that links in Spark as a dependency [1]. It will also work to put it in either #1 or #2, but this will be worse in the long term because each build cycle will require you to rebuild and restart all of Spark rather than just building your application and calling spark-submit on the new JAR. Ankur [1] http://spark.apache.org/docs/1.0.2/quick-start.html#standalone-applications - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Slave Node Management in Standalone Cluster
Hi Kenichi 1. How can I stop a slave on the specific node? Under `sbin/` directory, there are `start-{all,master,slave,slaves}` and `stop-{all,master,slaves}`, but no `stop-slave`. Are there any way to stop the specific (e.g., the 2nd) slave via command line? You can use sbin/spark-daemon.sh on the machine where the worker you'd like to stop runs. First, you find PID of the worker you'd like to stop and second, you find PID file of the worker. The PID file is on /tmp/ by default and the file name is like as follows. xxx.org.apache.spark.deploy.worker.Worker-WorkerID.pid After you find the PID file, you run the following command. sbin/spark-daemon.sh stop org.apache.spark.worker.Worker WorkerID 2. How can I check cluster status from command line? Are there any way to confirm that all Master / Workers are up and working without using Web UI? AFAIK, there are no command line tools for checking statuses of standalone cluster. Instead of that, you can use special URL like as follows. http://master or worker's hostname:webui-port/json You can get Master and Worker status as JSON format data. - Kousuke (2014/11/18 0:27), Kenichi Maehashi wrote: Hi, I'm operating Spark in standalone cluster configuration (3 slaves) and have some question. 1. How can I stop a slave on the specific node? Under `sbin/` directory, there are `start-{all,master,slave,slaves}` and `stop-{all,master,slaves}`, but no `stop-slave`. Are there any way to stop the specific (e.g., the 2nd) slave via command line? 2. How can I check cluster status from command line? Are there any way to confirm that all Master / Workers are up and working without using Web UI? Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Landmarks in GraphX section of Spark API
At 2014-11-18 15:29:08 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Does Bellman-Ford give the best solution? It gives the same solution as any other algorithm, since there's only one correct solution for shortest paths and it's guaranteed to find it eventually. There are probably faster distributed algorithms for single-source shortest paths, but I'm not familiar with them. As far as I can tell, distributed Bellman-Ford is the most widely-used one. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming with Kafka is failing with Error
Hi, While running my spark streaming application built on spark 1.1.0 I am getting below error. *14/11/18 15:35:30 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AbstractMethodError* * at org.apache.spark.Logging$class.log(Logging.scala:52)* * at org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66)* * at org.apache.spark.Logging$class.logInfo(Logging.scala:59)* * at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66)* * at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86)* * at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)* * at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)* * at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)* * at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)* * at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)* * at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)* * at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)* * at org.apache.spark.scheduler.Task.run(Task.scala:54)* * at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)* * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)* * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)* * at java.lang.Thread.run(Thread.java:722)* Can you guys please help me out here? -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: New Codes in GraphX
What command should I use to run the LiveJournalPageRank.scala? If you want to write a separate application, the ideal way is to do it in a separate project that links in Spark as a dependency [1]. But even for this, I have to do the build every time I change the code, right? Thank You On Tue, Nov 18, 2014 at 3:35 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-11-18 14:51:54 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I am using Spark-1.0.0. There are two GraphX directories that I can see here 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx which contains LiveJournalPageRank,scala 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which contains Analytics.scala, ConnectedComponenets.scala etc etc Now, if I want to add my own code to GraphX i.e., if I want to write a small application on GraphX, in which directory should I add my code, in 1 or 2 ? And what is the difference? If you want to add an algorithm which you can call from the Spark shell and submit as a pull request, you should add it to org.apache.spark.graphx.lib (#2). To run it from the command line, you'll also have to modify Analytics.scala. If you want to write a separate application, the ideal way is to do it in a separate project that links in Spark as a dependency [1]. It will also work to put it in either #1 or #2, but this will be worse in the long term because each build cycle will require you to rebuild and restart all of Spark rather than just building your application and calling spark-submit on the new JAR. Ankur [1] http://spark.apache.org/docs/1.0.2/quick-start.html#standalone-applications
Re: New Codes in GraphX
At 2014-11-18 15:35:13 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Now, how do I run the LiveJournalPageRank.scala that is there in 1? I think it should work to use MASTER=local[*] $SPARK_HOME/bin/run-example graphx.LiveJournalPageRank /edge-list-file.txt --numEPart=8 --numIter=10 --partStrategy=EdgePartition2D Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New Codes in GraphX
Yes the above command works, but there is this problem. Most of the times, the total rank is Nan (Not a Number). Why is it so? Thank You On Tue, Nov 18, 2014 at 3:48 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: What command should I use to run the LiveJournalPageRank.scala? If you want to write a separate application, the ideal way is to do it in a separate project that links in Spark as a dependency [1]. But even for this, I have to do the build every time I change the code, right? Thank You On Tue, Nov 18, 2014 at 3:35 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-11-18 14:51:54 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I am using Spark-1.0.0. There are two GraphX directories that I can see here 1. spark-1.0.0/examples/src/main/scala/org/apache/sprak/examples/graphx which contains LiveJournalPageRank,scala 2. spark-1.0.0/graphx/src/main/scala/org/apache/sprak/graphx/lib which contains Analytics.scala, ConnectedComponenets.scala etc etc Now, if I want to add my own code to GraphX i.e., if I want to write a small application on GraphX, in which directory should I add my code, in 1 or 2 ? And what is the difference? If you want to add an algorithm which you can call from the Spark shell and submit as a pull request, you should add it to org.apache.spark.graphx.lib (#2). To run it from the command line, you'll also have to modify Analytics.scala. If you want to write a separate application, the ideal way is to do it in a separate project that links in Spark as a dependency [1]. It will also work to put it in either #1 or #2, but this will be worse in the long term because each build cycle will require you to rebuild and restart all of Spark rather than just building your application and calling spark-submit on the new JAR. Ankur [1] http://spark.apache.org/docs/1.0.2/quick-start.html#standalone-applications
Re: Landmarks in GraphX section of Spark API
At 2014-11-18 15:44:31 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: I meant to ask whether it gives the solution faster than other algorithms. No, it's just that it's much simpler and easier to implement than the others. Section 5.2 of the Pregel paper [1] justifies using it for a graph (a binary tree) with 1 billion vertices on 300 machines: More advanced parallel algorithms exist, e.g., Thorup [44] or the ∆-stepping method [37], and have been used as the basis for special-purpose parallel shortest paths implementations [12, 32]. Such advanced algorithms can also be expressed in the Pregel framework. The simplicity of the implementation in Figure 5, however, together with the already acceptable performance (see Section 6), may appeal to users who can't do extensive tuning or customization. What do you mean by distributed algorithms? Can we not use any algorithm on a distributed environment? Any algorithm can be split up and run in a distributed environment, but because inter-node coordination is expensive, that can be very inefficient. Distributed algorithms in this context are ones that reduce coordination. Ankur [1] http://db.cs.berkeley.edu/cs286/papers/pregel-sigmod2010.pdf - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New Codes in GraphX
At 2014-11-18 15:51:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes the above command works, but there is this problem. Most of the times, the total rank is Nan (Not a Number). Why is it so? I've also seen this, but I'm not sure why it happens. If you could find out which vertices are getting the NaN rank, it might be helpful in tracking down the problem. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kestrel and Spark Stream
Hi guys, Has anyone already tried doing this work? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Kestrel and Spark Stream
You can implement a custom receiver http://spark.apache.org/docs/latest/streaming-custom-receivers.html to connect to Kestrel and use it. I think someone have already tried it, not sure if it is working though. Here's the link https://github.com/prabeesh/Spark-Kestrel/blob/master/streaming/src/main/scala/spark/streaming/dstream/KestrelInputDStream.scala . Thanks Best Regards On Tue, Nov 18, 2014 at 4:23 PM, Eduardo Alfaia e.costaalf...@unibs.it wrote: Hi guys, Has anyone already tried doing this work? Thanks Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Kestrel and Spark Stream
You can refer the following link https://github.com/prabeesh/Spark-Kestrel On Tue, Nov 18, 2014 at 3:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can implement a custom receiver http://spark.apache.org/docs/latest/streaming-custom-receivers.html to connect to Kestrel and use it. I think someone have already tried it, not sure if it is working though. Here's the link https://github.com/prabeesh/Spark-Kestrel/blob/master/streaming/src/main/scala/spark/streaming/dstream/KestrelInputDStream.scala . Thanks Best Regards On Tue, Nov 18, 2014 at 4:23 PM, Eduardo Alfaia e.costaalf...@unibs.it wrote: Hi guys, Has anyone already tried doing this work? Thanks Informativa sulla Privacy: http://www.unibs.it/node/8155
How to assign consecutive numeric id to each row based on its content?
Hi, In my spark application, I am loading some rows from database into Spark RDDs Each row has several fields, and a string key. Due to my requirements I need to work with consecutive numeric ids (starting from 1 to N, where N is the number of unique keys) instead of string keys . Also several rows can have same string key . In spark context, how I can map each row into (Numeric_Key, OriginalRow) as map/reduce tasks such that rows with same original string key get same numeric consecutive key? Any hints? best, /Shahab
Re: Building Spark with hive does not work
Ah... Thanks Ted! And Hao, sorry for being the original trouble maker :) On 11/18/14 1:50 AM, Ted Yu wrote: Looks like this was where you got that commandline: http://search-hadoop.com/m/JW1q5RlPrl Cheers On Mon, Nov 17, 2014 at 9:44 AM, Hao Ren inv...@gmail.com mailto:inv...@gmail.com wrote: Sry for spamming, Just after my previous post, I noticed that the command used is: ./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly thriftserver* the typo error is the evil. Stupid, me. I believe I just copy-pasted from somewhere else, but no even checked it, meanwhile no error msg, such as no such option, is displayed, which makes me consider the flags are correct. Sry for the carelessness. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
ReduceByKey but with different functions depending on key
Hello everyone, I'm new to Spark and I have the following problem: I have this large JavaRDDMyClass collection, which I group with by creating a hashcode from some fields in MyClass: JavaRDDMyClass collection = ...; JavaPairRDDInteger, Iterablelt;MyClass grouped = collection.groupBy(...); // the group-function is just creating a hashcode from some fields in MyClass. Now I want to reduce the variable grouped. However, I want to reduce it with different functions depending on the key in the JavaPairRDD. So basically a reduceByKey but with multiple functions. Only solution I've come up with is by filtering grouped for each reduce function and apply it on the filtered subsets. This feels kinda hackish though. Is there a better way? Best regards, Johannes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKey-but-with-different-functions-depending-on-key-tp19177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to assign consecutive numeric id to each row based on its content?
A not so efficient way can be this: |val r0: RDD[OriginalRow] = ... val r1 = r0.keyBy(row = extractKeyFromOriginalRow(row)) val r2 = r1.keys.distinct().zipWithIndex() val r3 = r2.join(r1).values | On 11/18/14 8:54 PM, shahab wrote: Hi, In my spark application, I am loading some rows from database into Spark RDDs Each row has several fields, and a string key. Due to my requirements I need to work with consecutive numeric ids (starting from 1 to N, where N is the number of unique keys) instead of string keys . Also several rows can have same string key . In spark context, how I can map each row into (Numeric_Key, OriginalRow) as map/reduce tasks such that rows with same original string key get same numeric consecutive key? Any hints? best, /Shahab
Getting spark job progress programmatically
I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
Is sorting persisted after pair rdd transformations?
I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Getting spark job progress programmatically
I started some quick hack for that in the notebook, you can head to: https://github.com/andypetrella/spark-notebook/blob/master/common/src/main/scala/notebook/front/widgets/SparkInfo.scala On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
Re: Building Spark with hive does not work
nvm, it would be better if correctness of flags could be checked by sbt during building. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting spark job progress programmatically
Thanks Andy. This is very useful. This gives me all active stages their percentage completion but I am unable to tie stages to job group (or specific job). I looked at Spark's code and to me, it seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated to StageInfo (possibly in the StageInfo.fromStage method). For now, I will have to write my own version JobProgressListener that stores stageId to group Id mapping. I will submit a JIRA ticket and seek spark dev's opinion on this. Many thanks for your prompt help Andy. Thanks, Aniket On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com wrote: I started some quick hack for that in the notebook, you can head to: https://github.com/andypetrella/spark-notebook/ blob/master/common/src/main/scala/notebook/front/widgets/SparkInfo.scala On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
Re: Getting spark job progress programmatically
yep, we should also propose to add this stuffs in the public API. Any other ideas? On Tue Nov 18 2014 at 4:03:35 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Andy. This is very useful. This gives me all active stages their percentage completion but I am unable to tie stages to job group (or specific job). I looked at Spark's code and to me, it seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated to StageInfo (possibly in the StageInfo.fromStage method). For now, I will have to write my own version JobProgressListener that stores stageId to group Id mapping. I will submit a JIRA ticket and seek spark dev's opinion on this. Many thanks for your prompt help Andy. Thanks, Aniket On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com wrote: I started some quick hack for that in the notebook, you can head to: https://github.com/andypetrella/spark-notebook/ blob/master/common/src/main/scala/notebook/front/widgets/SparkInfo.scala On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
sum/avg group by specified ranges
Hello Experts, I need to get total of an amount fields for specified date range. Now that group by on calculated field does not work (https://issues.apache.org/jira/browse/SPARK-4296), what is the best way to get this done? I thought to do it using spark, but I suspect I will miss the performance of spark sql on top of parquet file. Any suggestion? Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sum-avg-group-by-specified-ranges-tp19187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Status of MLLib exporting models to PMML
Yes, The case is convincing for PMML with Oryx. I will also investigate parameter server. Cheers, Charles On Tuesday, November 18, 2014, Sean Owen so...@cloudera.com wrote: I'm just using PMML. I haven't hit any limitation of its expressiveness, for the model types is supports. I don't think there is a point in defining a new format for models, excepting that PMML can get very big. Still, just compressing the XML gets it down to a manageable size for just about any realistic model.* I can imagine some kind of translation from PMML-in-XML to PMML-in-something-else that is more compact. I've not seen anyone do this. * there still aren't formats for factored matrices and probably won't ever quite be, since they're just too large for a file format. On Tue, Nov 18, 2014 at 5:34 AM, Manish Amde manish...@gmail.com javascript:; wrote: Hi Charles, I am not aware of other storage formats. Perhaps Sean or Sandy can elaborate more given their experience with Oryx. There is work by Smola et al at Google that talks about large scale model update and deployment. https://www.usenix.org/conference/osdi14/technical-sessions/presentation/li_mu -Manish -- - Charles
Nightly releases
Are nightly releases posted anywhere? There are quite a few vital bugfixes and performance improvements being commited to Spark and using the latest commits is useful (or even necessary for some jobs). Is there a place to post them, it doesn't seem like it would diffcult to run make-dist nightly and place it somwhere? Is is possible extract this from Jenkins bulds? Thanks, Arun
Re: Nightly releases
Of course we can run this as well to get the lastest, but the build is fairly long and this seems like a resource many would need. On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja aahuj...@gmail.com wrote: Are nightly releases posted anywhere? There are quite a few vital bugfixes and performance improvements being commited to Spark and using the latest commits is useful (or even necessary for some jobs). Is there a place to post them, it doesn't seem like it would diffcult to run make-dist nightly and place it somwhere? Is is possible extract this from Jenkins bulds? Thanks, Arun
Re: ReduceByKey but with different functions depending on key
First use groupByKey(), you get a tuple RDD with (key:K,value:ArrayBuffer[V]). Then use map() on this RDD with a function has different operations depending on the key which act as a parameter of this function. 在 2014年11月18日,下午8:59,jelgh johannes.e...@gmail.com 写道: Hello everyone, I'm new to Spark and I have the following problem: I have this large JavaRDDMyClass collection, which I group with by creating a hashcode from some fields in MyClass: JavaRDDMyClass collection = ...; JavaPairRDDInteger, Iterablelt;MyClass grouped = collection.groupBy(...); // the group-function is just creating a hashcode from some fields in MyClass. Now I want to reduce the variable grouped. However, I want to reduce it with different functions depending on the key in the JavaPairRDD. So basically a reduceByKey but with multiple functions. Only solution I've come up with is by filtering grouped for each reduce function and apply it on the filtered subsets. This feels kinda hackish though. Is there a better way? Best regards, Johannes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKey-but-with-different-functions-depending-on-key-tp19177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ReduceByKey but with different functions depending on key
groupByKey does not run a combiner so be careful about the performance...groupByKey does shuffle even for local groups... reduceByKey and aggregateByKey does run a combiner but if you want a separate function for each key, you can have a key to closure map that you can broadcast and use it in reduceByKey if you have access to the key in reduceByKey/aggregateByKey... I did not have the need to access the key in reduceByKey/aggregateByKey yet but there should be a way... On Tue, Nov 18, 2014 at 7:24 AM, Yanbo yanboha...@gmail.com wrote: First use groupByKey(), you get a tuple RDD with (key:K,value:ArrayBuffer[V]). Then use map() on this RDD with a function has different operations depending on the key which act as a parameter of this function. 在 2014年11月18日,下午8:59,jelgh johannes.e...@gmail.com 写道: Hello everyone, I'm new to Spark and I have the following problem: I have this large JavaRDDMyClass collection, which I group with by creating a hashcode from some fields in MyClass: JavaRDDMyClass collection = ...; JavaPairRDDInteger, Iterablelt;MyClass grouped = collection.groupBy(...); // the group-function is just creating a hashcode from some fields in MyClass. Now I want to reduce the variable grouped. However, I want to reduce it with different functions depending on the key in the JavaPairRDD. So basically a reduceByKey but with multiple functions. Only solution I've come up with is by filtering grouped for each reduce function and apply it on the filtered subsets. This feels kinda hackish though. Is there a better way? Best regards, Johannes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKey-but-with-different-functions-depending-on-key-tp19177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to assign consecutive numeric id to each row based on its content?
I think zipWithIndex is zero-based, so if you want 1 to N, you'll need to increment them like so: val r2 = r1.keys.distinct().zipWithIndex().mapValues(_ + 1) If the number of distinct keys is relatively small, you might consider collecting them into a map and broadcasting them rather than using a join, like so: val keyIndices = sc.broadcast(r2.collect.toMap) val r3 = r1.map { case (k, v) = (keyIndices(k), v) } On Tue, Nov 18, 2014 at 8:16 AM, Cheng Lian lian.cs@gmail.com wrote: A not so efficient way can be this: val r0: RDD[OriginalRow] = ...val r1 = r0.keyBy(row = extractKeyFromOriginalRow(row))val r2 = r1.keys.distinct().zipWithIndex()val r3 = r2.join(r1).values On 11/18/14 8:54 PM, shahab wrote: Hi, In my spark application, I am loading some rows from database into Spark RDDs Each row has several fields, and a string key. Due to my requirements I need to work with consecutive numeric ids (starting from 1 to N, where N is the number of unique keys) instead of string keys . Also several rows can have same string key . In spark context, how I can map each row into (Numeric_Key, OriginalRow) as map/reduce tasks such that rows with same original string key get same numeric consecutive key? Any hints? best, /Shahab -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Exception in spark sql when running a group by query
ah makes sense - Thanks Michael! On Mon, Nov 17, 2014 at 6:08 PM, Michael Armbrust mich...@databricks.com wrote: You are perhaps hitting an issue that was fixed by #3248 https://github.com/apache/spark/pull/3248? On Mon, Nov 17, 2014 at 9:58 AM, Sadhan Sood sadhan.s...@gmail.com wrote: While testing sparkSQL, we were running this group by with expression query and got an exception. The same query worked fine on hive. SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') 14/11/17 17:41:46 ERROR thriftserver.SparkSQLDriver: Failed in [SELECT from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') as pst_date, count(*) as num_xyzs FROM all_matched_abc GROUP BY from_unixtime(floor(xyz.whenrequestreceived/1000.0 - 25200), '/MM/dd') ] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Expression not in GROUP BY: HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179, tree: Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd)], [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor(((CAST(xyz#183.whenrequestreceived AS whenrequestreceived#187L, DoubleType) / 1000.0) - CAST(25200, DoubleType))),/MM/dd) AS pst_date#179,COUNT(1) AS num_xyzs#180L] MetastoreRelation default, all_matched_abc, None at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:127) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3$$anonfun$applyOrElse$6.apply(Analyzer.scala:125) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:125) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$$anonfun$apply$3.applyOrElse(Analyzer.scala:115) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:115) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckAggregation$.apply(Analyzer.scala:113) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at
Re: Nightly releases
I can see this being valuable for users wanting to live on the cutting edge without building CI infrastructure themselves, myself included. I think Patrick's recent work on the build scripts for 1.2.0 will make delivering nightly builds to a public maven repo easier. On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja aahuj...@gmail.com wrote: Of course we can run this as well to get the lastest, but the build is fairly long and this seems like a resource many would need. On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja aahuj...@gmail.com wrote: Are nightly releases posted anywhere? There are quite a few vital bugfixes and performance improvements being commited to Spark and using the latest commits is useful (or even necessary for some jobs). Is there a place to post them, it doesn't seem like it would diffcult to run make-dist nightly and place it somwhere? Is is possible extract this from Jenkins bulds? Thanks, Arun
Is there a way to create key based on counts in Spark
As it is difficult to explain this, I would show what I want. Lets us say, I have an RDD A with the following value A = [word1, word2, word3] I want to have an RDD with the following value B = [(1, word1), (2, word2), (3, word3)] That is, it gives a unique number to each entry as a key value. Can we do such thing with Python or Scala?
Re: ReduceByKey but with different functions depending on key
Map the key value into a key,Tuple2key,value and process that - Also ask the Spark maintainers for a version of keyed operations where the key is passed in as an argument - I run into these cases all the time /** * map a tuple int a key tuple pair to insure subsequent processing has access to both Key and value * @param inp input pair RDD * @param K key type * @param V value type * @return output where value has both key and value */ @Nonnull public static K extends Serializable, V extends Serializable JavaPairRDDK,Tuple2lt;K, V toKeyedTuples(@Nonnull JavaPairRDD K, V inp) { return inp.flatMapToPair(new PairFlatMapFunctionTuple2lt;K, V, K, Tuple2K, V() { @Override public IterableTuple2lt;K, Tuple2lt;K, V call(final Tuple2K, V t) throws Exception { return new Tuple2K, Tuple2lt;K, V(t._1(),new Tuple2K,V(t._1(),t._2()); } }); } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKey-but-with-different-functions-depending-on-key-tp19177p19198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JavaKafkaWordCount
Hi Guys, I am doing some tests with JavaKafkaWordCount, my cluster is composed by 8 workers and 1 driver con spark-1.1.0, I am using Kafka too and I have some questions about. 1 - When I launch the command: bin/spark-submit --class org.apache.spark.examples.streaming.JavaKafkaWordCount —master spark://computer8:7077 --driver-memory 1g --executor-memory 2g --executor-cores 2 examples/target/scala-2.10/spark-examples-1.1.0-hadoop1.0.4.jar computer49:2181 test-consumer-group test 2 I see in the Spark WebAdmin that only 1 worker work. Why? 2 - In Kafka I can see the same thing: Group Topic Pid Offset logSize Lag Owner test-consumer-group test 0 147092 147092 0 test-consumer-group_computer1-1416319543858-817b566f-0 test-consumer-group test 1 232183 232183 0 test-consumer-group_computer1-1416319543858-817b566f-0 test-consumer-group test 2 186805 186805 0 test-consumer-group_computer1-1416319543858-817b566f-0 test-consumer-group test 3 0 0 0 test-consumer-group_computer1-1416319543858-817b566f-1 test-consumer-group test 4 0 0 0 test-consumer-group_computer1-1416319543858-817b566f-1 test-consumer-group test 5 0 0 0 test-consumer-group_computer1-1416319543858-817b566f-1 I would like to understand this behavior, Is it normal? Am I doing something wrong? Thanks Guys -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Is there a way to create key based on counts in Spark
Use zipWithIndex but cache the data before you run zipWithIndex...that way your ordering will be consistent (unless the bug has been fixed where you don't have to cache the data)... Normally these operations are used for dictionary building and so I am hoping you can cache the dictionary of RDD[String] before you can run zipWithIndex... indices are within 0 till maxIndex-1...if you want 1 you have to later map the index to index + 1 On Tue, Nov 18, 2014 at 8:56 AM, Blind Faith person.of.b...@gmail.com wrote: As it is difficult to explain this, I would show what I want. Lets us say, I have an RDD A with the following value A = [word1, word2, word3] I want to have an RDD with the following value B = [(1, word1), (2, word2), (3, word3)] That is, it gives a unique number to each entry as a key value. Can we do such thing with Python or Scala?
Spark on YARN
Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
Re: Spark on YARN
I run my Spark on YARN jobs as: HADOOP_CONF_DIR=/etc/hadoop/conf/ /app/data/v606014/dist/bin/spark-submit --master yarn --jars test-job.jar --executor-cores 4 --num-executors 10 --executor-memory 16g --driver-memory 4g --class TestClass test.jar It uses HADOOP_CONF_DIR to schedule executors and I get the number I ask for (assuming other MapReduce jobs are not taking the cluster)... Large memory intensive jobs like ALS still get issues on YARN but simple jobs run fine... Mine is also internal CDH cluster... On Tue, Nov 18, 2014 at 10:03 AM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
Re: Spark on YARN
Can you check in your RM's web UI how much of each resource does Yarn think you have available? You can also check that in the Yarn configuration directly. Perhaps it's not configured to use all of the available resources. (If it was set up with Cloudera Manager, CM will reserve some room for daemons that need to run on each machine, so it won't tell Yarn to make all 32 cores / 64 GB available for applications.) Also remember that Spark needs to start num executors + 1 containers when adding up all the needed resources. The extra container generally requires less resources than the executors, but it still needs to allocate resources from the RM. On Tue, Nov 18, 2014 at 10:03 AM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN
Hey Alan, Spark's application master will take up 1 core on one of the nodes on the cluster. This means that that node will only have 31 cores remaining, not enough to fit your third executor. -Sandy On Tue, Nov 18, 2014 at 10:03 AM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
Re: Spark on YARN
My guess is you're asking for all cores of all machines but the driver needs at least one core, so one executor is unable to find a machine to fit on. On Nov 18, 2014 7:04 PM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
Re: Spark streaming cannot receive any message from Kafka
Hi Jerry, I looked at KafkaUtils.createStream api and found actually the spark.default.parallelism is specified in SparkConf instead. I do not remember the exact stacks of the exception. But the exception was incurred when createStream was called if we do not specify the spark.default.parallelism. The error message basically shows parsing an empty string into Int if spark.default.parallelism is not specified. Bill On Mon, Nov 17, 2014 at 4:45 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Bill, Would you mind describing what you found a little more specifically, I’m not sure there’s the a parameter in KafkaUtils.createStream you can specify the spark parallelism, also what is the exception stacks. Thanks Jerry *From:* Bill Jay [mailto:bill.jaypeter...@gmail.com] *Sent:* Tuesday, November 18, 2014 2:47 AM *To:* Helena Edelson *Cc:* Jay Vyas; u...@spark.incubator.apache.org; Tobias Pfeiffer; Shao, Saisai *Subject:* Re: Spark streaming cannot receive any message from Kafka Hi all, I find the reason of this issue. It seems in the new version, if I do not specify spark.default.parallelism in KafkaUtils.createstream, there will be an exception since the kakfa stream creation stage. In the previous versions, it seems Spark will use the default value. Thanks! Bill On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson helena.edel...@datastax.com wrote: I encounter no issues with streaming from kafka to spark in 1.1.0. Do you perhaps have a version conflict? Helena On Nov 13, 2014 12:55 AM, Jay Vyas jayunit100.apa...@gmail.com wrote: Yup , very important that n1 for spark streaming jobs, If local use local[2] The thing to remember is that your spark receiver will take a thread to itself and produce data , so u need another thread to consume it . In a cluster manager like yarn or mesos, the word thread Is not used anymore, I guess has different meaning- you need 2 or more free compute slots, and that should be guaranteed by looking to see how many free node managers are running etc. On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp t...@preferred.jp] *Sent:* Thursday, November 13, 2014 8:45 AM *To:* Bill Jay *Cc:* u...@spark.incubator.apache.org *Subject:* Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Re: Pyspark Error
My best guess would be a networking issue--it looks like the Python socket library isn't able to connect to whatever hostname you're providing Spark in the configuration. On 11/18/14 9:10 AM, amin mohebbi wrote: Hi there, *I have already downloaded Pre-built spark-1.1.0, I want to run pyspark by try typing ./bin/pyspark but I got the following error:* * * *scala shell is up and working fine* hduser@master:~/Downloads/spark-1.1.0$ ./bin/spark-shell Java HotSpot(TM) Client VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties . . 14/11/18 04:33:13 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@master:34937/user/HeartbeatReceiver 14/11/18 04:33:13 INFO SparkILoop: Created spark context.. Spark context available as sc. scala hduser@master:~/Downloads/spark-1.1.0$ * * *But python shell does not work:* hduser@master:~/Downloads/spark-1.1.0$ hduser@master:~/Downloads/spark-1.1.0$ hduser@master:~/Downloads/spark-1.1.0$ ./bin/pyspark Python 2.7.3 (default, Feb 27 2014, 20:00:17) [GCC 4.6.3] on linux2 Type help, copyright, credits or license for more information. Java HotSpot(TM) Client VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/11/18 04:36:06 INFO SecurityManager: Changing view acls to: hduser, 14/11/18 04:36:06 INFO SecurityManager: Changing modify acls to: hduser, 14/11/18 04:36:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, ); users with modify permissions: Set(hduser, ) 14/11/18 04:36:06 INFO Slf4jLogger: Slf4jLogger started 14/11/18 04:36:06 INFO Remoting: Starting remoting 14/11/18 04:36:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@master:52317] 14/11/18 04:36:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@master:52317] 14/11/18 04:36:06 INFO Utils: Successfully started service 'sparkDriver' on port 52317. 14/11/18 04:36:06 INFO SparkEnv: Registering MapOutputTracker 14/11/18 04:36:06 INFO SparkEnv: Registering BlockManagerMaster 14/11/18 04:36:06 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141118043606-c346 14/11/18 04:36:07 INFO Utils: Successfully started service 'Connection manager for block manager' on port 47507. 14/11/18 04:36:07 INFO ConnectionManager: Bound socket to port 47507 with id = ConnectionManagerId(master,47507) 14/11/18 04:36:07 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 14/11/18 04:36:07 INFO BlockManagerMaster: Trying to register BlockManager 14/11/18 04:36:07 INFO BlockManagerMasterActor: Registering block manager master:47507 with 267.3 MB RAM 14/11/18 04:36:07 INFO BlockManagerMaster: Registered BlockManager 14/11/18 04:36:07 INFO HttpFileServer: HTTP File server directory is /tmp/spark-8b29544a-c74b-4a3e-88e0-13801c8dcc65 14/11/18 04:36:07 INFO HttpServer: Starting HTTP Server 14/11/18 04:36:07 INFO Utils: Successfully started service 'HTTP file server' on port 40029. 14/11/18 04:36:12 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/11/18 04:36:12 INFO SparkUI: Started SparkUI at http://master:4040 http://master:4040/ 14/11/18 04:36:12 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@master:52317/user/HeartbeatReceiver 14/11/18 04:36:12 INFO SparkUI: Stopped Spark web UI at http://master:4040 http://master:4040/ 14/11/18 04:36:12 INFO DAGScheduler: Stopping DAGScheduler 14/11/18 04:36:13 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/18 04:36:13 INFO ConnectionManager: Selector thread was interrupted! 14/11/18 04:36:13 INFO ConnectionManager: ConnectionManager stopped 14/11/18 04:36:13 INFO MemoryStore: MemoryStore cleared 14/11/18 04:36:13 INFO BlockManager: BlockManager stopped 14/11/18 04:36:13 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/18 04:36:13 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/18 04:36:13 INFO SparkContext: Successfully stopped SparkContext 14/11/18 04:36:13 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/18 04:36:13 INFO Remoting: Remoting shut down 14/11/18 04:36:13 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /home/hduser/Downloads/spark-1.1.0/python/pyspark/shell.py, line 44, in module sc = SparkContext(appName=PySparkShell, pyFiles=add_files) File /home/hduser/Downloads/spark-1.1.0/python/pyspark/context.py, line 107, in __init__ conf) File /home/hduser/Downloads/spark-1.1.0/python/pyspark/context.py, line 159, in _do_init self._accumulatorServer = accumulators._start_update_server() File
Iterative transformations over RDD crashes in phantom reduce
Hi all, This is somewhat related to my previous question ( http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html , for additional context) but for all practical purposes this is its own issue. As in my previous question, I'm making iterative changes to an RDD, where each iteration depends on the results of the previous one. I've stripped down what was previously a loop to just be two sequential edits to try and nail down where the problem is. It looks like this: index = 0 INDEX = sc.broadcast(index) M = M.flatMap(func1).reduceByKey(func2) M.foreach(debug_output) index = 1 INDEX = sc.broadcast(index) M = M.flatMap(func1) M.foreach(debug_output) M is basically a row-indexed matrix, where each index points to a dictionary (sparse matrix more or less, with some domain-specific modifications). This program crashes on the second-to-last (7th) line; the creepy part is that it says the crash happens in func2 with the broadcast variable INDEX == 1 (it attempts to access an entry that doesn't exist in a dictionary of one of the rows). How is that even possible? Am I missing something fundamental about how Spark works under the hood? Thanks for your help! Shannon - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Iterative transformations over RDD crashes in phantom reduce
To clarify about what, precisely, is impossible: the crash happens with INDEX == 1 in func2, but func2 is only called in the reduceByKey transformation when INDEX == 0. And according to the output of the foreach() in line 4, that reduceByKey(func2) works just fine. How is it then invoked again with INDEX == 1 when there clearly isn't another reduce call at line 7? On 11/18/14 1:58 PM, Shannon Quinn wrote: Hi all, This is somewhat related to my previous question ( http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html , for additional context) but for all practical purposes this is its own issue. As in my previous question, I'm making iterative changes to an RDD, where each iteration depends on the results of the previous one. I've stripped down what was previously a loop to just be two sequential edits to try and nail down where the problem is. It looks like this: index = 0 INDEX = sc.broadcast(index) M = M.flatMap(func1).reduceByKey(func2) M.foreach(debug_output) index = 1 INDEX = sc.broadcast(index) M = M.flatMap(func1) M.foreach(debug_output) M is basically a row-indexed matrix, where each index points to a dictionary (sparse matrix more or less, with some domain-specific modifications). This program crashes on the second-to-last (7th) line; the creepy part is that it says the crash happens in func2 with the broadcast variable INDEX == 1 (it attempts to access an entry that doesn't exist in a dictionary of one of the rows). How is that even possible? Am I missing something fundamental about how Spark works under the hood? Thanks for your help! Shannon - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
unsubscribe
- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pyspark Error
It seems that `localhost` can not be resolved in your machines, I had filed https://issues.apache.org/jira/browse/SPARK-4475 to track it. On Tue, Nov 18, 2014 at 6:10 AM, amin mohebbi aminn_...@yahoo.com.invalid wrote: Hi there, I have already downloaded Pre-built spark-1.1.0, I want to run pyspark by try typing ./bin/pyspark but I got the following error: scala shell is up and working fine hduser@master:~/Downloads/spark-1.1.0$ ./bin/spark-shell Java HotSpot(TM) Client VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties . . 14/11/18 04:33:13 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@master:34937/user/HeartbeatReceiver 14/11/18 04:33:13 INFO SparkILoop: Created spark context.. Spark context available as sc. scala hduser@master:~/Downloads/spark-1.1.0$ But python shell does not work: hduser@master:~/Downloads/spark-1.1.0$ hduser@master:~/Downloads/spark-1.1.0$ hduser@master:~/Downloads/spark-1.1.0$ ./bin/pyspark Python 2.7.3 (default, Feb 27 2014, 20:00:17) [GCC 4.6.3] on linux2 Type help, copyright, credits or license for more information. Java HotSpot(TM) Client VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/11/18 04:36:06 INFO SecurityManager: Changing view acls to: hduser, 14/11/18 04:36:06 INFO SecurityManager: Changing modify acls to: hduser, 14/11/18 04:36:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, ); users with modify permissions: Set(hduser, ) 14/11/18 04:36:06 INFO Slf4jLogger: Slf4jLogger started 14/11/18 04:36:06 INFO Remoting: Starting remoting 14/11/18 04:36:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@master:52317] 14/11/18 04:36:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@master:52317] 14/11/18 04:36:06 INFO Utils: Successfully started service 'sparkDriver' on port 52317. 14/11/18 04:36:06 INFO SparkEnv: Registering MapOutputTracker 14/11/18 04:36:06 INFO SparkEnv: Registering BlockManagerMaster 14/11/18 04:36:06 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141118043606-c346 14/11/18 04:36:07 INFO Utils: Successfully started service 'Connection manager for block manager' on port 47507. 14/11/18 04:36:07 INFO ConnectionManager: Bound socket to port 47507 with id = ConnectionManagerId(master,47507) 14/11/18 04:36:07 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 14/11/18 04:36:07 INFO BlockManagerMaster: Trying to register BlockManager 14/11/18 04:36:07 INFO BlockManagerMasterActor: Registering block manager master:47507 with 267.3 MB RAM 14/11/18 04:36:07 INFO BlockManagerMaster: Registered BlockManager 14/11/18 04:36:07 INFO HttpFileServer: HTTP File server directory is /tmp/spark-8b29544a-c74b-4a3e-88e0-13801c8dcc65 14/11/18 04:36:07 INFO HttpServer: Starting HTTP Server 14/11/18 04:36:07 INFO Utils: Successfully started service 'HTTP file server' on port 40029. 14/11/18 04:36:12 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/11/18 04:36:12 INFO SparkUI: Started SparkUI at http://master:4040 14/11/18 04:36:12 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@master:52317/user/HeartbeatReceiver 14/11/18 04:36:12 INFO SparkUI: Stopped Spark web UI at http://master:4040 14/11/18 04:36:12 INFO DAGScheduler: Stopping DAGScheduler 14/11/18 04:36:13 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/18 04:36:13 INFO ConnectionManager: Selector thread was interrupted! 14/11/18 04:36:13 INFO ConnectionManager: ConnectionManager stopped 14/11/18 04:36:13 INFO MemoryStore: MemoryStore cleared 14/11/18 04:36:13 INFO BlockManager: BlockManager stopped 14/11/18 04:36:13 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/18 04:36:13 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/18 04:36:13 INFO SparkContext: Successfully stopped SparkContext 14/11/18 04:36:13 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/18 04:36:13 INFO Remoting: Remoting shut down 14/11/18 04:36:13 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /home/hduser/Downloads/spark-1.1.0/python/pyspark/shell.py, line 44, in module sc = SparkContext(appName=PySparkShell, pyFiles=add_files) File /home/hduser/Downloads/spark-1.1.0/python/pyspark/context.py, line 107, in __init__ conf) File /home/hduser/Downloads/spark-1.1.0/python/pyspark/context.py, line 159, in _do_init self._accumulatorServer = accumulators._start_update_server() File
Re: Is there a way to create key based on counts in Spark
On Tue, Nov 18, 2014 at 9:06 AM, Debasish Das debasish.da...@gmail.com wrote: Use zipWithIndex but cache the data before you run zipWithIndex...that way your ordering will be consistent (unless the bug has been fixed where you don't have to cache the data)... Could you point some link about the bug? Normally these operations are used for dictionary building and so I am hoping you can cache the dictionary of RDD[String] before you can run zipWithIndex... indices are within 0 till maxIndex-1...if you want 1 you have to later map the index to index + 1 On Tue, Nov 18, 2014 at 8:56 AM, Blind Faith person.of.b...@gmail.com wrote: As it is difficult to explain this, I would show what I want. Lets us say, I have an RDD A with the following value A = [word1, word2, word3] I want to have an RDD with the following value B = [(1, word1), (2, word2), (3, word3)] That is, it gives a unique number to each entry as a key value. Can we do such thing with Python or Scala? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Iterative transformations over RDD crashes in phantom reduce
Sorry everyone--turns out an oft-forgotten single line of code was required to make this work: index = 0 INDEX = sc.broadcast(index) M = M.flatMap(func1).reduceByKey(func2) M.foreach(debug_output) *M.cache()* index = 1 INDEX = sc.broadcast(index) M = M.flatMap(func1) M.foreach(debug_output) Works as expected now, and I understand why it was failing before: Spark was trying to recompute the RDD but consequently it was invoked with index == 1. On 11/18/14 2:02 PM, Shannon Quinn wrote: To clarify about what, precisely, is impossible: the crash happens with INDEX == 1 in func2, but func2 is only called in the reduceByKey transformation when INDEX == 0. And according to the output of the foreach() in line 4, that reduceByKey(func2) works just fine. How is it then invoked again with INDEX == 1 when there clearly isn't another reduce call at line 7? On 11/18/14 1:58 PM, Shannon Quinn wrote: Hi all, This is somewhat related to my previous question ( http://apache-spark-user-list.1001560.n3.nabble.com/Iterative-changes-to-RDD-and-broadcast-variables-tt19042.html , for additional context) but for all practical purposes this is its own issue. As in my previous question, I'm making iterative changes to an RDD, where each iteration depends on the results of the previous one. I've stripped down what was previously a loop to just be two sequential edits to try and nail down where the problem is. It looks like this: index = 0 INDEX = sc.broadcast(index) M = M.flatMap(func1).reduceByKey(func2) M.foreach(debug_output) index = 1 INDEX = sc.broadcast(index) M = M.flatMap(func1) M.foreach(debug_output) M is basically a row-indexed matrix, where each index points to a dictionary (sparse matrix more or less, with some domain-specific modifications). This program crashes on the second-to-last (7th) line; the creepy part is that it says the crash happens in func2 with the broadcast variable INDEX == 1 (it attempts to access an entry that doesn't exist in a dictionary of one of the rows). How is that even possible? Am I missing something fundamental about how Spark works under the hood? Thanks for your help! Shannon
Re: SparkSQL exception on spark.sql.codegen
Those are probably related. It looks like we are somehow not being thread safe when initializing various parts of the scala compiler. Since code gen is pretty experimental we probably won't have the resources to investigate backporting a fix. However, if you can reproduce the problem in Spark 1.2 then please file a JIRA. On Mon, Nov 17, 2014 at 9:37 PM, Eric Zhen zhpeng...@gmail.com wrote: Yes, it's always appears on a part of the whole tasks in a stage(i.e. 100/100 (65 failed)), and sometimes cause the stage to fail. And there is another error that I'm not sure if there is a correlation. java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$ at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:114) at org.apache.spark.sql.execution.Filter.conditionEvaluator$lzycompute(basicOperators.scala:55) at org.apache.spark.sql.execution.Filter.conditionEvaluator(basicOperators.scala:55) at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:58) at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:57) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) On Tue, Nov 18, 2014 at 11:41 AM, Michael Armbrust mich...@databricks.com wrote: Interesting, I believe we have run that query with version 1.1.0 with codegen turned on and not much has changed there. Is the error deterministic? On Mon, Nov 17, 2014 at 7:04 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi Michael, We use Spark v1.1.1-rc1 with jdk 1.7.0_51 and scala 2.10.4. On Tue, Nov 18, 2014 at 7:09 AM, Michael Armbrust mich...@databricks.com wrote: What version of Spark SQL? On Sat, Nov 15, 2014 at 10:25 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi all, We run SparkSQL on TPCDS benchmark Q19 with spark.sql.codegen=true, we got exceptions as below, has anyone else saw these before? java.lang.ExceptionInInitializerError at org.apache.spark.sql.execution.SparkPlan.newProjection(SparkPlan.scala:92) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1$$anonfun$1.apply(Exchange.scala:51) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1$$anonfun$1.apply(Exchange.scala:48) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.NullPointerException at scala.reflect.internal.Types$TypeRef.computeHashCode(Types.scala:2358) at scala.reflect.internal.Types$UniqueType.init(Types.scala:1304) at scala.reflect.internal.Types$TypeRef.init(Types.scala:2341) at scala.reflect.internal.Types$NoArgsTypeRef.init(Types.scala:2137) at scala.reflect.internal.Types$TypeRef$$anon$6.init(Types.scala:2544)
Re: Is there a way to create key based on counts in Spark
On Tue, Nov 18, 2014 at 8:26 PM, Davies Liu dav...@databricks.com wrote: On Tue, Nov 18, 2014 at 9:06 AM, Debasish Das debasish.da...@gmail.com wrote: Use zipWithIndex but cache the data before you run zipWithIndex...that way your ordering will be consistent (unless the bug has been fixed where you don't have to cache the data)... Could you point some link about the bug? I think it's this: https://issues.apache.org/jira/browse/SPARK-3098 ... but it's resolved as not a bug. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problems launching 1.2.0-SNAPSHOT cluster with Hive support on EC2
I've developed a Spark application using the 1.2.0-SNAPSHOP branch that leverages Spark Streaming and Hive and can run it locally with no problem (I need some fixes in the 1.2.0 branch). I successfully launched my EC2 cluster by specifying a git commit hash from the 1.2.0-SNAPSHOT branch as the version (as documented in the scripts) and was able to run non-Hive Spark jobs against it. However, when I try and deploy my Hive application to it, I am getting an exception (java.lang.ArrayStoreException: org.apache.spark.sql.execution.SparkStrategies$CommandStrategy?) creating my HiveContext. I suspect that after the spark-ec2 launch script causes the source code clone of the git repo/commit point, the assembly isn't being built with Hive support (e.g. a -Phive profile on the mvn build command as described in building Spark with Maven in the documentation). Is anyone aware of a way I can get Hive support added into my compiled assembly on the master/slaves using the spark_ec scripts (or where it would make sense to hack the existing scripts to get it working). Thanks, Curt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problems-launching-1-2-0-SNAPSHOT-cluster-with-Hive-support-on-EC2-tp19221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX twitter
I'm having problems running the twitter graph on a cluster with 4 nodes, each having over 100GB of RAM and 32 virtual cores per node. I do have a pre-installed spark version (built against hadoop 2.3, because it didn't compile on my system), but I'm loading my graph file from disk without hdfs. The twitter graph is around 25GB big and I'm loading the graph with GraphLoader.edgeListFile(..,.., minEdgePartitions = 128 ). I assume that 128 partitions is optimal because that's the total number of cores that I have. Now I've started one executor on each node, which has 100GB of RAM and spark.executor.memory=32 to enjoy full parallelism. 4 workers, each having one executor, each executor using 32 cores -- 128 cores, 128 partitions. Is there any configuration that can be used for replicating results as given in this http://arxiv.org/pdf/1402.2394v1.pdf paper? The paper states a running time of under 500s. I can't get any results after more than 1 hour. I'm running the built-in algorithm with 15 iterations. Am I using too many partitions? Is there a bottleneck I can't see? Turned on GC logging, it looks like that: 3.785: [GC [PSYoungGen: 62914560K-7720K(73400320K)] 62914560K-7792K(241172480K), 0.0151130 secs] [Times: user=0.27 sys=0.02, real=0.02 secs] 9.209: [GC [PSYoungGen: 62922280K-1943393K(73400320K)] 62922352K-1943473K(241172480K), 0.6108790 secs] [Times: user=5.95 sys=8.04, real=0.62 secs] 13.316: [GC [PSYoungGen: 64857953K-4283906K(73400320K)] 64858033K-4283994K(241172480K), 1.1567380 secs] [Times: user=10.84 sys=15.73, real=1.16 secs] 17.931: [GC [PSYoungGen: 67198466K-6808418K(73400320K)] 67198554K-6808514K(241172480K), 1.9807690 secs] [Times: user=16.21 sys=29.29, real=1.99 secs] 26.112: [GC [PSYoungGen: 69722978K-7211955K(73400320K)] 69723074K-7212059K(241172480K), 2.1325980 secs] [Times: user=15.66 sys=33.33, real=2.14 secs] 64.833: [GC [PSYoungGen: 70126515K-5378991K(74105216K)] 70126619K-5379103K(241877376K), 0.3315500 secs] [Times: user=7.53 sys=0.00, real=0.33 secs] In the stderr log, I wonder about these logs: INFO HadoopRDD: Input split: file:/.../twitter.edge:25065160704+33554432 INFO HadoopRDD: Input split: file:/.../twitter.edge:19696451584+33554432 Why does it even split the file as a HadoopRDD? I also wonder about this error: ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId($SPARK_MASTER,59331) java.io.IOException: sendMessageReliably failed without being ACK'd Any help would be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-twitter-tp19222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unsubscribe
Abdul, Please send an email to user-unsubscr...@spark.apache.org On Tue, Nov 18, 2014 at 2:05 PM, Abdul Hakeem alhak...@gmail.com wrote: - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Lost executors
Hi, I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark shell. I am running a job that essentially reads a bunch of HBase keys, looks up HBase data, and performs some filtering and aggregation. The job works fine in smaller datasets, but when i try to execute on the full dataset, the job never completes. The few symptoms i notice are: a. The job shows progress for a while and then starts throwing lots of the following errors: 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor 906 disconnected, so removing it* 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost executor 906 on machine name: remote Akka client disassociated* 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN org.apache.spark.storage.BlockManagerMasterActor - *Removing BlockManager BlockManagerId(9186, machine name, 54600, 0) with no recent heart beats: 82313ms exceeds 45000ms* Looking at the logs, the job never recovers from these errors, and continues to show errors about lost executors and launching new executors, and this just continues for a long time. Could this be because the executors are running out of memory? In terms of memory usage, the intermediate data could be large (after the HBase lookup), but partial and fully aggregated data set size should be quite small - essentially a bunch of ids and counts ( 1 mil in total). b. In the Spark UI, i am seeing the following errors (redacted for brevity), not sure if they are transient or real issue: java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read timed out} ... org.apache.spark.util.Utils$.fetchFile(Utils.scala:349) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ... java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724) I was trying to get more data to investigate but haven't been able to figure out how to enable logging on the executors. The Spark UI appears stuck and i only see driver side logs in the jobhistory directory specified in the job. Thanks, pala
Re: Lost executors
Hi Pala, Do you have access to your YARN NodeManager logs? Are you able to check whether they report killing any containers for exceeding memory limits? -Sandy On Tue, Nov 18, 2014 at 1:54 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark shell. I am running a job that essentially reads a bunch of HBase keys, looks up HBase data, and performs some filtering and aggregation. The job works fine in smaller datasets, but when i try to execute on the full dataset, the job never completes. The few symptoms i notice are: a. The job shows progress for a while and then starts throwing lots of the following errors: 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor 906 disconnected, so removing it* 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost executor 906 on machine name: remote Akka client disassociated* 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN org.apache.spark.storage.BlockManagerMasterActor - *Removing BlockManager BlockManagerId(9186, machine name, 54600, 0) with no recent heart beats: 82313ms exceeds 45000ms* Looking at the logs, the job never recovers from these errors, and continues to show errors about lost executors and launching new executors, and this just continues for a long time. Could this be because the executors are running out of memory? In terms of memory usage, the intermediate data could be large (after the HBase lookup), but partial and fully aggregated data set size should be quite small - essentially a bunch of ids and counts ( 1 mil in total). b. In the Spark UI, i am seeing the following errors (redacted for brevity), not sure if they are transient or real issue: java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read timed out} ... org.apache.spark.util.Utils$.fetchFile(Utils.scala:349) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ... java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724) I was trying to get more data to investigate but haven't been able to figure out how to enable logging on the executors. The Spark UI appears stuck and i only see driver side logs in the jobhistory directory specified in the job. Thanks, pala
spark-shell giving me error of unread block data
I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/ version of spark being 1.1. The file i'm loading is literally just 7 MB. I thought it was jar files mismatch, but i did a compare and see they're all identical. But seeing as how they were all installed through CDH parcels, not sure how there would be version mismatch on the nodes and master. Oh yeah 1 master node w/ 2 worker nodes and running in standalone not through yarn. So as a just in case, i copied the jars from the master to the 2 worker nodes as just in case, and still same issue. Weird thing is, first time i installed and tested it out, it worked, but now it doesn't. Any help here would be greatly appreciated.
Re: Is there a way to create key based on counts in Spark
I see, thanks! On Tue, Nov 18, 2014 at 12:12 PM, Sean Owen so...@cloudera.com wrote: On Tue, Nov 18, 2014 at 8:26 PM, Davies Liu dav...@databricks.com wrote: On Tue, Nov 18, 2014 at 9:06 AM, Debasish Das debasish.da...@gmail.com wrote: Use zipWithIndex but cache the data before you run zipWithIndex...that way your ordering will be consistent (unless the bug has been fixed where you don't have to cache the data)... Could you point some link about the bug? I think it's this: https://issues.apache.org/jira/browse/SPARK-3098 ... but it's resolved as not a bug. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-shell giving me error of unread block data
It can be a serialization issue. Happens when there are different versions installed on the same system. What do you mean by the first time you installed and tested it out? On Wed, Nov 19, 2014 at 3:29 AM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/ version of spark being 1.1. The file i'm loading is literally just 7 MB. I thought it was jar files mismatch, but i did a compare and see they're all identical. But seeing as how they were all installed through CDH parcels, not sure how there would be version mismatch on the nodes and master. Oh yeah 1 master node w/ 2 worker nodes and running in standalone not through yarn. So as a just in case, i copied the jars from the master to the 2 worker nodes as just in case, and still same issue. Weird thing is, first time i installed and tested it out, it worked, but now it doesn't. Any help here would be greatly appreciated.
Cores on Master
I see the default and max cores settings but these seem to control total cores per cluster. My cobbled together home cluster needs the Master to not use all its cores or it may lock up (it does other things). Is there a way to control max cores used for a particular cluster machine in standalone mode? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cores on Master
Looks like I can do this by not using start-all.sh but starting each worker separately passing in a '--cores n' to the master? No config/env way? On Nov 18, 2014, at 3:14 PM, Pat Ferrel p...@occamsmachete.com wrote: I see the default and max cores settings but these seem to control total cores per cluster. My cobbled together home cluster needs the Master to not use all its cores or it may lock up (it does other things). Is there a way to control max cores used for a particular cluster machine in standalone mode? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JdbcRDD
Hi, Are there any examples of using JdbcRDD in java available? Its not clear what is the last argument in this example ( https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala ): sc = new SparkContext(local, test) val rdd = new JdbcRDD( sc, () = { DriverManager.getConnection(jdbc:derby:target/JdbcRDDSuiteDb) }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 3, (r: ResultSet) = { r.getInt(1) } ).cache() Thanks
Re: Cores on Master
This seems to work only on a ‘worker’ not the master? So I’m back to having no way to control cores on the master? On Nov 18, 2014, at 3:24 PM, Pat Ferrel p...@occamsmachete.com wrote: Looks like I can do this by not using start-all.sh but starting each worker separately passing in a '--cores n' to the master? No config/env way? On Nov 18, 2014, at 3:14 PM, Pat Ferrel p...@occamsmachete.com wrote: I see the default and max cores settings but these seem to control total cores per cluster. My cobbled together home cluster needs the Master to not use all its cores or it may lock up (it does other things). Is there a way to control max cores used for a particular cluster machine in standalone mode? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JdbcRDD
I had also same problem to use JdbcRDD in java. For me, I have written a class in scala to get JdbcRDD, and I call this instance from java. for instance, JdbcRDDWrapper.scala like this: ... import java.sql._ import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD import com.gsshop.polaris.domain.event._ class JdbcRDDWrapper(sc: SparkContext, rowCount: Long, from: Long, to: Long) { def getItemViewEventJdbcRdd(): JdbcRDD[ItemViewEvent] = { val sql = + SELECT + i.ID as id, + i.ITEM_ID as \itemViewEvent.itemId\, + i.BRAND_ID as \itemViewEvent.brandId\, + i.ITEM_TYPE as \itemViewEvent.itemType\, + i.PROMOTION_ID as \itemViewEvent.promotionId\, + i.PRICE as \itemViewEvent.price\, + i.ITEM_TITLE as \itemViewEvent.itemTitle\, + i.ITEM_DESCRIPTION as \itemViewEvent.itemDescription\, + i.THUMB_NAIL_URL as \itemViewEvent.thumbnailUrl\, + i.LOAD_DATE as loadDate, + b.EVENT_TYPE as \itemViewEvent.baseProperties.eventType\, + b.TIMESTAMP as \itemViewEvent.baseProperties.timestamp\, + b.URL as \itemViewEvent.baseProperties.url\, + b.REFERER as \itemViewEvent.baseProperties.referer\, + b.UID as \itemViewEvent.baseProperties.uid\, + b.PCID as \itemViewEvent.baseProperties.pcid\, + b.SERVICE_ID as \itemViewEvent.baseProperties.serviceId\, + b.VERSION as \itemViewEvent.baseProperties.version\, + b.DEVICE_TYPE as \itemViewEvent.baseProperties.deviceType\, + b.DOMAIN as \itemViewEvent.baseProperties.domain\, + b.SITE as \itemViewEvent.baseProperties.site\ + FROM ITEM_VIEW_EVENT AS i + INNER JOIN BASE_PROPERTIES AS b + ON i.ID = b.EVENT_ID + WHERE b.TIMESTAMP != ? AND + from + = b.TIMESTAMP AND b.TIMESTAMP + to + LIMIT ? val rdd = new JdbcRDD( sc, () = { Class.forName(org.apache.phoenix.jdbc.PhoenixDriver) DriverManager.getConnection(jdbc:phoenix:xx:/hbase-unsecure) }, sql, 0, rowCount, 5, (rs: ResultSet) = { val baseProperties = new BaseProperties() baseProperties.setEventType(rs.getString( itemViewEvent.baseProperties.eventType)) baseProperties.setTimestamp(rs.getLong( itemViewEvent.baseProperties.timestamp)) baseProperties.setUrl(rs.getString( itemViewEvent.baseProperties.url)) baseProperties.setReferer(rs.getString( itemViewEvent.baseProperties.referer)) baseProperties.setUid(rs.getString( itemViewEvent.baseProperties.uid)) baseProperties.setPcid(rs.getString( itemViewEvent.baseProperties.pcid)) baseProperties.setServiceId(rs.getString( itemViewEvent.baseProperties.serviceId)) baseProperties.setVersion(rs.getString( itemViewEvent.baseProperties.version)) baseProperties.setDeviceType(rs.getString( itemViewEvent.baseProperties.deviceType)) baseProperties.setDomain(rs.getString( itemViewEvent.baseProperties.domain)) baseProperties.setSite(rs.getString( itemViewEvent.baseProperties.site)) val itemViewEvent = new ItemViewEvent() itemViewEvent.setItemId(rs.getString(itemViewEvent.itemId)) itemViewEvent.setBrandId(rs.getString(itemViewEvent.brandId)) itemViewEvent.setItemType(rs.getString(itemViewEvent.itemType)) itemViewEvent.setPromotionId(rs.getString( itemViewEvent.promotionId)) itemViewEvent.setPrice(rs.getLong(itemViewEvent.price)) itemViewEvent.setItemTitle(rs.getString(itemViewEvent.itemTitle)) itemViewEvent.setItemDescription(rs.getString( itemViewEvent.itemDescription)) itemViewEvent.setThumbnailUrl(rs.getString( itemViewEvent.thumbnailUrl)) itemViewEvent.setBaseProperties(baseProperties) itemViewEvent }) rdd } } and from java, JdbcRdd can be received: import scala.reflect.ClassManifestFactory$; ... JdbcRDDItemViewEvent jdbcRddItemViewEvent = new JdbcRDDWrapper(JavaSparkContext.toSparkContext(ctx), rowCountItemViewEvent, fromTime, toTime).getItemViewEventJdbcRdd(); JavaRDDItemViewEvent javaRddItemViewEvent = JavaRDD.fromRDD(jdbcRddItemViewEvent, ClassManifestFactory$.MODULE$.fromClass(ItemViewEvent.class)); - Kidong. 2014-11-19 8:58 GMT+09:00 sparkling [via Apache Spark User List] ml-node+s1001560n19233...@n3.nabble.com: Hi, Are there any examples of using JdbcRDD in java available? Its not clear what is the last argument in this example ( https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala ): sc = new SparkContext(local, test) val rdd = new JdbcRDD( sc, () = { DriverManager.getConnection(jdbc:derby:target/JdbcRDDSuiteDb) }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 3, (r: ResultSet) = { r.getInt(1) } ).cache() Thanks -- If you reply to this email, your message will be added to the discussion below:
Re: JdbcRDD
Thanks Kidong. I'll try your approach. On Tue, Nov 18, 2014 at 4:22 PM, mykidong mykid...@gmail.com wrote: I had also same problem to use JdbcRDD in java. For me, I have written a class in scala to get JdbcRDD, and I call this instance from java. for instance, JdbcRDDWrapper.scala like this: ... import java.sql._ import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD import com.gsshop.polaris.domain.event._ class JdbcRDDWrapper(sc: SparkContext, rowCount: Long, from: Long, to: Long) { def getItemViewEventJdbcRdd(): JdbcRDD[ItemViewEvent] = { val sql = + SELECT + i.ID as id, + i.ITEM_ID as \itemViewEvent.itemId\, + i.BRAND_ID as \itemViewEvent.brandId\, + i.ITEM_TYPE as \itemViewEvent.itemType\, + i.PROMOTION_ID as \itemViewEvent.promotionId\, + i.PRICE as \itemViewEvent.price\, + i.ITEM_TITLE as \itemViewEvent.itemTitle\, + i.ITEM_DESCRIPTION as \itemViewEvent.itemDescription\, + i.THUMB_NAIL_URL as \itemViewEvent.thumbnailUrl\, + i.LOAD_DATE as loadDate, + b.EVENT_TYPE as \itemViewEvent.baseProperties.eventType\, + b.TIMESTAMP as \itemViewEvent.baseProperties.timestamp\, + b.URL as \itemViewEvent.baseProperties.url\, + b.REFERER as \itemViewEvent.baseProperties.referer\, + b.UID as \itemViewEvent.baseProperties.uid\, + b.PCID as \itemViewEvent.baseProperties.pcid\, + b.SERVICE_ID as \itemViewEvent.baseProperties.serviceId\, + b.VERSION as \itemViewEvent.baseProperties.version\, + b.DEVICE_TYPE as \itemViewEvent.baseProperties.deviceType\, + b.DOMAIN as \itemViewEvent.baseProperties.domain\, + b.SITE as \itemViewEvent.baseProperties.site\ + FROM ITEM_VIEW_EVENT AS i + INNER JOIN BASE_PROPERTIES AS b + ON i.ID = b.EVENT_ID + WHERE b.TIMESTAMP != ? AND + from + = b.TIMESTAMP AND b.TIMESTAMP + to + LIMIT ? val rdd = new JdbcRDD( sc, () = { Class.forName(org.apache.phoenix.jdbc.PhoenixDriver) DriverManager.getConnection(jdbc:phoenix:xx:/hbase-unsecure) }, sql, 0, rowCount, 5, (rs: ResultSet) = { val baseProperties = new BaseProperties() baseProperties.setEventType(rs.getString( itemViewEvent.baseProperties.eventType)) baseProperties.setTimestamp(rs.getLong( itemViewEvent.baseProperties.timestamp)) baseProperties.setUrl(rs.getString( itemViewEvent.baseProperties.url)) baseProperties.setReferer(rs.getString( itemViewEvent.baseProperties.referer)) baseProperties.setUid(rs.getString( itemViewEvent.baseProperties.uid)) baseProperties.setPcid(rs.getString( itemViewEvent.baseProperties.pcid)) baseProperties.setServiceId(rs.getString( itemViewEvent.baseProperties.serviceId)) baseProperties.setVersion(rs.getString( itemViewEvent.baseProperties.version)) baseProperties.setDeviceType(rs.getString( itemViewEvent.baseProperties.deviceType)) baseProperties.setDomain(rs.getString( itemViewEvent.baseProperties.domain)) baseProperties.setSite(rs.getString( itemViewEvent.baseProperties.site)) val itemViewEvent = new ItemViewEvent() itemViewEvent.setItemId(rs.getString(itemViewEvent.itemId)) itemViewEvent.setBrandId(rs.getString(itemViewEvent.brandId)) itemViewEvent.setItemType(rs.getString(itemViewEvent.itemType)) itemViewEvent.setPromotionId(rs.getString( itemViewEvent.promotionId)) itemViewEvent.setPrice(rs.getLong(itemViewEvent.price)) itemViewEvent.setItemTitle(rs.getString(itemViewEvent.itemTitle )) itemViewEvent.setItemDescription(rs.getString( itemViewEvent.itemDescription)) itemViewEvent.setThumbnailUrl(rs.getString( itemViewEvent.thumbnailUrl)) itemViewEvent.setBaseProperties(baseProperties) itemViewEvent }) rdd } } and from java, JdbcRdd can be received: import scala.reflect.ClassManifestFactory$; ... JdbcRDDItemViewEvent jdbcRddItemViewEvent = new JdbcRDDWrapper(JavaSparkContext.toSparkContext(ctx), rowCountItemViewEvent, fromTime, toTime).getItemViewEventJdbcRdd(); JavaRDDItemViewEvent javaRddItemViewEvent = JavaRDD.fromRDD(jdbcRddItemViewEvent, ClassManifestFactory$.MODULE$.fromClass(ItemViewEvent.class)); - Kidong. 2014-11-19 8:58 GMT+09:00 sparkling [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=19235i=0: Hi, Are there any examples of using JdbcRDD in java available? Its not clear what is the last argument in this example ( https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala ): sc = new SparkContext(local, test) val rdd = new JdbcRDD( sc, () = { DriverManager.getConnection(jdbc:derby:target/JdbcRDDSuiteDb) },
Spark streaming: java.io.IOException: Version Mismatch (Expected: 28, Received: 18245 )
Hi all, I am running a Spark Streaming job. It was able to produce the correct results up to some time. Later on, the job was still running but producing no result. I checked the Spark streaming UI and found that 4 tasks of a stage failed. The error messages showed that Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 400048, ip-172-31-13-130.ec2.internal): ExecutorLostFailure (executor lost) Driver stacktrace: I further clicked the stage and found 4 executors running the stages had the error message: ExecutorLostFailure (executor lost) The stage that failed was actually runJob at ReceiverTracker.scala:275 http://ec2-54-172-118-237.compute-1.amazonaws.com:9046/proxy/application_1415902783817_0019/stages/stage?id=2attempt=0, which is the stage that keeps receiving message from Kafka. I guess that is why the job does not produce results any more. To investigate it, I logged into one of the executor machine and checked the hadoop log. The log file contains a lot of exception message: *java.io.IOException: Version Mismatch (Expected: 28, Received: 18245 )* This streaming job is reading from Kafka and producing aggregation results. After this stage failure, the job is still running but there is no data shuffle as seen in the Spark UI. The amount of the time this job can run correctly varies from job to job. Does anyone has an idea why this Spark Streaming job had this exception? And why it cannot recover from the stage failure? Thanks! Bill
Re: Cores on Master
OK hacking the start-slave.sh did it On Nov 18, 2014, at 4:12 PM, Pat Ferrel p...@occamsmachete.com wrote: This seems to work only on a ‘worker’ not the master? So I’m back to having no way to control cores on the master? On Nov 18, 2014, at 3:24 PM, Pat Ferrel p...@occamsmachete.com wrote: Looks like I can do this by not using start-all.sh but starting each worker separately passing in a '--cores n' to the master? No config/env way? On Nov 18, 2014, at 3:14 PM, Pat Ferrel p...@occamsmachete.com wrote: I see the default and max cores settings but these seem to control total cores per cluster. My cobbled together home cluster needs the Master to not use all its cores or it may lock up (it does other things). Is there a way to control max cores used for a particular cluster machine in standalone mode? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parsing a large XML file using Spark
If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump that all revision information also) that is stored in HDFS, is it possible to parse it in parallel/faster using Spark? Or do we have to use something like a PullParser or Iteratee? My current solution is to read the single XML file in the first pass - write it to HDFS and then read the small files in parallel on the Spark workers. Thanks -Soumya
Re: Lost executors
Sandy, Good point - i forgot about NM logs. When i looked up the NM logs, i only see the following statements that align with the driver side log about lost executor. Many executors show the same log statement at the same time, so it seems like the decision to kill many if not all executors happened centrally, and all executors got notified somehow: 14/11/18 00:18:25 INFO Executor: Executor is trying to kill task 2013 14/11/18 00:18:25 INFO Executor: Executor killed task 2013 In general, i also see quite a few instances of the following exception across many executors/nodes. : 14/11/17 23:58:00 INFO HadoopRDD: Input split: hdfs dir path/sorted_keys-1020_3-r-00255.deflate:0+415841 14/11/17 23:58:00 WARN BlockReaderLocal: error creating DomainSocket java.net.ConnectException: connect(2) error: Connection refused when trying to connect to '/srv/var/hadoop/runs/hdfs/dn_socket' at org.apache.hadoop.net.unix.DomainSocket.connect0(Native Method) at org.apache.hadoop.net.unix.DomainSocket.connect(DomainSocket.java:250) at org.apache.hadoop.hdfs.DomainSocketFactory.createSocket(DomainSocketFactory.java:158) at org.apache.hadoop.hdfs.BlockReaderFactory.nextDomainPeer(BlockReaderFactory.java:721) at org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:441) at org.apache.hadoop.hdfs.client.ShortCircuitCache.create(ShortCircuitCache.java:780) at org.apache.hadoop.hdfs.client.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:714) at org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:395) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:303) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:567) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:790) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.isEmpty(Iterator.scala:256) at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157) at $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:51) at $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:50) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at
Re: SparkSQL exception on spark.sql.codegen
Okay, thank you Micheal. On Wed, Nov 19, 2014 at 3:45 AM, Michael Armbrust mich...@databricks.com wrote: Those are probably related. It looks like we are somehow not being thread safe when initializing various parts of the scala compiler. Since code gen is pretty experimental we probably won't have the resources to investigate backporting a fix. However, if you can reproduce the problem in Spark 1.2 then please file a JIRA. On Mon, Nov 17, 2014 at 9:37 PM, Eric Zhen zhpeng...@gmail.com wrote: Yes, it's always appears on a part of the whole tasks in a stage(i.e. 100/100 (65 failed)), and sometimes cause the stage to fail. And there is another error that I'm not sure if there is a correlation. java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$ at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:114) at org.apache.spark.sql.execution.Filter.conditionEvaluator$lzycompute(basicOperators.scala:55) at org.apache.spark.sql.execution.Filter.conditionEvaluator(basicOperators.scala:55) at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:58) at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:57) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) On Tue, Nov 18, 2014 at 11:41 AM, Michael Armbrust mich...@databricks.com wrote: Interesting, I believe we have run that query with version 1.1.0 with codegen turned on and not much has changed there. Is the error deterministic? On Mon, Nov 17, 2014 at 7:04 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi Michael, We use Spark v1.1.1-rc1 with jdk 1.7.0_51 and scala 2.10.4. On Tue, Nov 18, 2014 at 7:09 AM, Michael Armbrust mich...@databricks.com wrote: What version of Spark SQL? On Sat, Nov 15, 2014 at 10:25 PM, Eric Zhen zhpeng...@gmail.com wrote: Hi all, We run SparkSQL on TPCDS benchmark Q19 with spark.sql.codegen=true, we got exceptions as below, has anyone else saw these before? java.lang.ExceptionInInitializerError at org.apache.spark.sql.execution.SparkPlan.newProjection(SparkPlan.scala:92) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1$$anonfun$1.apply(Exchange.scala:51) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1$$anonfun$1.apply(Exchange.scala:48) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.NullPointerException at scala.reflect.internal.Types$TypeRef.computeHashCode(Types.scala:2358) at scala.reflect.internal.Types$UniqueType.init(Types.scala:1304) at scala.reflect.internal.Types$TypeRef.init(Types.scala:2341) at
Re: Spark Streaming with Kafka is failing with Error
Hi, do you have some logging backend (log4j, logback) on your classpath? This seems a bit like there is no particular implementation of the abstract `log()` method available. Tobias
Re: Parsing a large XML file using Spark
Hi, see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for one solution. One issue with those XML files is that they cannot be processed line by line in parallel; plus you inherently need shared/global state to parse XML or check for well-formedness, I think. (Same issue with multi-line JSON, by the way.) Tobias
Re: Sourcing data from RedShift
Hi guys, We ultimately needed to add 8 ec2 xl's to get better performance. As was suspected, we could not fit all the data into ram. This worked great with files sized around 100-350MB in size as our initial export task produced. Unfortunately, for the partition settings that we were able to get to work with GraphX (unable to change parallelism due to bug), we are unable to keep writing files at this size - our output ends up being closer to 1GB per file. As a result, our job seems to struggle working with a 100GB worth of these files. We are in a rough spot because upgrading Spark right now is not reasonable for us but this bug prevents solving the issue. On Fri, Nov 14, 2014 at 9:29 PM, Gary Malouf malouf.g...@gmail.com wrote: I'll try this out and follow up with what I find. On Fri, Nov 14, 2014 at 8:54 PM, Xiangrui Meng m...@databricks.com wrote: For each node, if the CSV reader is implemented efficiently, you should be able to hit at least half of the theoretical network bandwidth, which is about 60MB/second/node. So if you just do counting, the expect time should be within 3 minutes. Note that your cluster have 15GB * 12 = 180GB RAM in total. If you use the default spark.storage.memoryFraction, it can barely cache 100GB of data, not considering the overhead. So if your operation need to cache the data to be efficient, you may need a larger cluster or change the storage level to MEMORY_AND_DISK. -Xiangrui On Nov 14, 2014, at 5:32 PM, Gary Malouf malouf.g...@gmail.com wrote: Hmm, we actually read the CSV data in S3 now and were looking to avoid that. Unfortunately, we've experienced dreadful performance reading 100GB of text data for a job directly from S3 - our hope had been connecting directly to Redshift would provide some boost. We had been using 12 m3.xlarges, but increasing default parallelism (to 2x # of cpus across cluster) and increasing partitions during reading did not seem to help. On Fri, Nov 14, 2014 at 6:51 PM, Xiangrui Meng m...@databricks.com wrote: Michael is correct. Using direct connection to dump data would be slow because there is only a single connection. Please use UNLOAD with ESCAPE option to dump the table to S3. See instructions at http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html And then load them back using the redshift input format we wrote: https://github.com/databricks/spark-redshift (we moved the implementation to github/databricks). Right now all columns are loaded as string columns, and you need to do type casting manually. We plan to add a parser that can translate Redshift table schema directly to Spark SQL schema, but no ETA yet. -Xiangrui On Nov 14, 2014, at 3:46 PM, Michael Armbrust mich...@databricks.com wrote: I'd guess that its an s3n://key:secret_key@bucket/path from the UNLOAD command used to produce the data. Xiangrui can correct me if I'm wrong though. On Fri, Nov 14, 2014 at 2:19 PM, Gary Malouf malouf.g...@gmail.com wrote: We have a bunch of data in RedShift tables that we'd like to pull in during job runs to Spark. What is the path/url format one uses to pull data from there? (This is in reference to using the https://github.com/mengxr/redshift-input-format)
Re: Slave Node Management in Standalone Cluster
Hi Akhil and Kousuke, Thank you for your quick response. Monitoring through JSON API seems straightforward and cool. Thanks again! 2014-11-18 19:06 GMT+09:00 Kousuke Saruta saru...@oss.nttdata.co.jp: Hi Kenichi 1. How can I stop a slave on the specific node? Under `sbin/` directory, there are `start-{all,master,slave,slaves}` and `stop-{all,master,slaves}`, but no `stop-slave`. Are there any way to stop the specific (e.g., the 2nd) slave via command line? You can use sbin/spark-daemon.sh on the machine where the worker you'd like to stop runs. First, you find PID of the worker you'd like to stop and second, you find PID file of the worker. The PID file is on /tmp/ by default and the file name is like as follows. xxx.org.apache.spark.deploy.worker.Worker-WorkerID.pid After you find the PID file, you run the following command. sbin/spark-daemon.sh stop org.apache.spark.worker.Worker WorkerID 2. How can I check cluster status from command line? Are there any way to confirm that all Master / Workers are up and working without using Web UI? AFAIK, there are no command line tools for checking statuses of standalone cluster. Instead of that, you can use special URL like as follows. http://master or worker's hostname:webui-port/json You can get Master and Worker status as JSON format data. - Kousuke (2014/11/18 0:27), Kenichi Maehashi wrote: Hi, I'm operating Spark in standalone cluster configuration (3 slaves) and have some question. 1. How can I stop a slave on the specific node? Under `sbin/` directory, there are `start-{all,master,slave,slaves}` and `stop-{all,master,slaves}`, but no `stop-slave`. Are there any way to stop the specific (e.g., the 2nd) slave via command line? 2. How can I check cluster status from command line? Are there any way to confirm that all Master / Workers are up and working without using Web UI? Thanks in advance! -- Kenichi Maehashi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Converting a json struct to map
Hi, I'm loading a json file into a RDD and then save that RDD as parquet. One of the fields is a map of keys and values but it is being translated and stored as a struct. How can I convert the field into a map? Thanks, Daniel
k-means clustering
Hi there, I would like to do text clustering using k-means and Spark on a massive dataset. As you know, before running the k-means, I have to do pre-processing methods such as TFIDF and NLTK on my big dataset. The following is my code in python : | | if __name__ == '__main__': | | | # Cluster a bunch of text documents. | | | import re | | | import sys | | | | | | k = 6 | | | vocab = {} | | | xs = [] | | | ns=[] | | | cat=[] | | | filename='2013-01.csv' | | | with open(filename, newline='') as f: | | | try: | | | newsreader = csv.reader(f) | | | for row in newsreader: | | | ns.append(row[3]) | | | cat.append(row[4]) | | | except csv.Error as e: | | | sys.exit('file %s, line %d: %s' % (filename, newsreader.line_num, e)) | | | | | | | | | remove_spl_char_regex = re.compile('[%s]' % re.escape(string.punctuation)) # regex to remove special characters | | | remove_num = re.compile('[\d]+') | | | #nltk.download() | | | stop_words=nltk.corpus.stopwords.words('english') | | | | | | for a in ns: | | | x = defaultdict(float) | | | | | | | | | a1 = a.strip().lower() | | | a2 = remove_spl_char_regex.sub( ,a1) # Remove special characters | | | a3 = remove_num.sub(, a2) #Remove numbers | | | #Remove stop words | | | words = a3.split() | | | filter_stop_words = [w for w in words if not w in stop_words] | | | stemed = [PorterStemmer().stem_word(w) for w in filter_stop_words] | | | ws=sorted(stemed) | | | | | | | | | #ws=re.findall(r\w+, a1) | | | for w in ws: | | | vocab.setdefault(w, len(vocab)) | | | x[vocab[w]] += 1 | | | xs.append(x.items()) | | | Can anyone explain to me how can I do the pre-processing step, before running the k-means using spark. Best Regards ... Amin Mohebbi PhD candidate in Software Engineering at university of Malaysia Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my amin_...@me.com
SparkSQL and Hive/Hive metastore testing - LocalHiveContext
Hi, Just to give some context. We are using Hive metastore with csv Parquet files as a part of our ETL pipeline. We query these with SparkSQL to do some down stream work. I'm curious whats the best way to go about testing Hive SparkSQL? I'm using 1.1.0 I see that the LocalHiveContext has been depreciated. https://issues.apache.org/jira/browse/SPARK-2397 My testing strategy is as part of my Before block I basically create the HiveContext then create the databases/tables and map them to some test sample data files in my test resources directory. The LocalSparkContext was useful because I could inject this as part of the test setup and it would take care of creating the metastore and warehouse directories for hive for me (local to my project). If I just create a Hive context it does create the metastore_db folder locally. But the warehouse directory is not created! Thus running a command like hc.sql(CREATE DATABASE myDb) results in a Hive error. I also can't supply a test hive-site.xml because it wont allow relative paths. Which means that there is some shared directory that everyone needs to have. The only other option is to call the setConf method like LocalSparkContext does. Since LocalSparkContext is on the way out, I'm wondering if I'm doing something stupid. Is there a better way to mock this out and test Hive/metastore with SparkSQL? Cheers, ~N
A partitionBy problem
Hi all, I tested *partitionBy *feature in wordcount application, and I'm puzzled by a phenomenon. In this application, I created an rdd from some text files in HDFS(about 100GB in size), each of which has lines composed of words separated by a character #. I wanted to count the occurence for each distinct word. *All lines have the same contents so finally the result should be very small in bytes*. The code is as follows: val text = sc.textFile(inputDir) val tuples = text.flatMap(line = line.split(#)) .map((_, 1)) .reduceByKey(_ + _) tuples.collect.foreach{ case (word, count) = println(word + - + count)} I submitted the application to a Spark cluster of 5 nodes and run it in standalone mode. From the application UI http://imgbin.org/index.php?page=imageid=20976, we can see that the shuffle process for *collect* and *reduceByKey* occupied small bandwidth (766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle write). *However, the shuffle process occupied quite large bandwith when I added partitionBy like this:* val text = sc.textFile(inputDir) val tuples = text.flatMap(line = line.split(#)) .map((_, 1)) .partitionBy(new HashPartitioner(100)) .reduceByKey(_ + _) tuples.collect.foreach{ case (word, count) = println(word + - + count)} From the application UI http://imgbin.org/index.php?page=imageid=20977, we can see that the shuffle read for *collect* is 2.8GB and the shuffle write for *map* is 3.5GB. The *map* transformations are applied on 5 nodes of the cluster because the HDFS blocks are distributed among these 5 nodes. The *map* transformations are applied for each element in the rdd on different nodes and doesn't need shuffle the new rdd. *So my first question is : why did the map transformation occupy so large bandwidth(3.5GB) when I added partitionBy in the codes ?* When *collect* is applied, is needs to collect the results, namely (*word*, *totalCount*) tuples from 5 nodes to the driver. That process should occupy very small bandwidth because all lines have the same contents like AAA#BBB#CCC#DDD, which means the final results the *collect* retrieved should be very small in bytes(for example hundreds of KB). *So my second question is : Why did the collect action occupy so large bandwidth(2.8GB) when I added partitionByKey in the codes ?* *And the third question : When I added partitionBy for an rdd, it will return a new rdd. Does that mean the rdd will be immediately shuffled across nodes to meet the requirement specified by the supplied partitioner, or will the supplied partitioner merely be a sign indicating how to partition the rdd later. * Thanks.
Re: Converting a json struct to map
Something like this? val map_rdd = json_rdd.map(json = { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val myMap = mapper.readValue[Map[String,String]](json) myMap }) Thanks Best Regards On Wed, Nov 19, 2014 at 11:01 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm loading a json file into a RDD and then save that RDD as parquet. One of the fields is a map of keys and values but it is being translated and stored as a struct. How can I convert the field into a map? Thanks, Daniel