Re: Solving Systems of Linear Equations Using Spark?
You can try LinearRegression with sparse input. It converges the least squares solution if the linear system is over-determined, while the convergence rate depends on the condition number. Applying standard scaling is popular heuristic to reduce the condition number. If you are interested in sparse direct methods as in SuiteSparse. I'm not aware of any effort to do so. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: prepending jars to the driver class path for spark-submit on YARN
There is an undocumented configuration to put users jars in front of spark jar. But I'm not very certain that it works as expected (and this is why it is undocumented). Please try turning on spark.yarn.user.classpath.first . -Xiangrui On Sat, Sep 6, 2014 at 5:13 PM, Victor Tso-Guillen v...@paxata.com wrote: I ran into the same issue. What I did was use maven shade plugin to shade my version of httpcomponents libraries into another package. On Fri, Sep 5, 2014 at 4:33 PM, Penny Espinoza pesp...@societyconsulting.com wrote: Hey - I’m struggling with some dependency issues with org.apache.httpcomponents httpcore and httpclient when using spark-submit with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster. I’ve seen several posts about this issue, but no resolution. The error message is this: Caused by: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332) at com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44) at com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20) ... 17 more The apache httpcomponents libraries include the method above as of version 4.2. The Spark 1.0.2 binaries seem to include version 4.1. I can get this to work in my driver program by adding exclusions to force use of 4.1, but then I get the error in tasks even when using the —jars option of the spark-submit command. How can I get both the driver program and the individual tasks in my spark-streaming job to use the same version of this library so my job will run all the way through? thanks p - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: error: type mismatch while Union
Thank you Aaron for pointing out problem. This only happens when I run this code in spark-shell but not when i submit the job. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-type-mismatch-while-Union-tp13547p13677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to list all registered tables in a sql context?
Thanks Tobias, I also found this: https://issues.apache.org/jira/browse/SPARK-3299 Looks like it's been working on. Jianshi On Mon, Sep 8, 2014 at 9:28 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Sat, Sep 6, 2014 at 1:40 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Err... there's no such feature? The problem is that the SQLContext's `catalog` member is protected, so you can't access it from outside. If you subclass SQLContext, and make sure that `catalog` is always a `SimpleCatalog`, you can check `catalog.tables` (which is a HashMap). Tobias -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: distcp on ec2 standalone spark cluster
~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distcp on ec2 standalone spark cluster
Tomer, To use distcp, you need to have a Hadoop compute cluster up. start-dfs just restarts HDFS. I don’t have a Spark 1.0.2 cluster up right now, but there should be a start-mapred*.sh or start-all.sh script that will launch the Hadoop MapReduce cluster that you will need for distcp. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 8, 2014, at 12:28 AM, Tomer Benyamini tomer@gmail.com wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and database access (e.g. MySQL)
That should be OK, since the iterator is definitely consumed, and therefore the connection actually done with, at the end of a 'foreach' method. You might put the close in a finally block. On Mon, Sep 8, 2014 at 12:29 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I have the following code: stream foreachRDD { rdd = if (rdd.take (1).size == 1) { rdd foreachPartition { iterator = initDbConnection () iterator foreach { write to db } closeDbConnection () } } } On Sun, Sep 7, 2014 at 1:26 PM, Sean Owen so...@cloudera.com wrote: ... I'd call out that last bit as actually tricky: close off the driver See this message for the right-est way to do that, along with the right way to open DB connections remotely instead of trying to serialize them: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Standard pattern is to initialize the mysql jdbc driver in your mappartition call , update database then close off the driver. Couple of gotchas 1. New driver initiated for all your partitions 2. If the effect(inserts updates) is not idempotent, so if your server crashes, Spark will replay updates to mysql may cause data corruption. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Sun, Sep 7, 2014 at 11:54 AM, jchen jc...@pivotal.io wrote: Hi, Has someone tried using Spark Streaming with MySQL (or any other database/data store)? I can write to MySQL at the beginning of the driver application. However, when I am trying to write the result of every streaming processing window to MySQL, it fails with the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.mysql.jdbc.JDBC4PreparedStatement I think it is because the statement object should be serializable, in order to be executed on the worker node. Has someone tried the similar cases? Example code will be very helpful. My intension is to execute INSERT/UPDATE/DELETE/SELECT statements for each sliding window. Thanks, JC -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and database access (e.g. MySQL)
Hi, On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote: if (rdd.take (1).size == 1) { rdd foreachPartition { iterator = I was wondering: Since take() is an output operation, isn't it computed twice (once for the take(1), once during the iteration)? Or will only one single element be computed for take(1)? Thanks Tobias
sharing off_heap rdds
I see that the tachyon url constructed for an rdd partition has executor id in it. So if the same partition is being processed by a different executor on a reexecution of the same computation, it cannot really use the earlier result. Is this a correct assessment? Will removing the executor id from the tachyon path fix the issue without introducing any problems? -- Thanks
How to profile a spark application
Hi, Can someone tell me how to profile a spark application. -Karthik
Re: Spark SQL check if query is completed (pyspark)
thank you for the replies. I am running an insert on a join (INSERT OVERWRITE TABLE new_table select * from table1 as a join table2 as b on (a.key = b.key), The process does not have the right permission to write to that folder, so I get the following error printed: chgrp: `/user/x/y': No such file or directory chmod: `/user/x/y': No such file or directory and it returns an empty RDD without getting an exception. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-check-if-query-is-completed-pyspark-tp13630p13685.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to profile a spark application
See https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit On Sep 8, 2014, at 2:48 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, Can someone tell me how to profile a spark application. -Karthik
Re: Running spark-shell (or queries) over the network (not from master)
Solved. The problem is the following: the underlying Akka driver uses the INTERNAL interface address on the Amazon instance (the ones that start with 10.x.y.z) to present itself to the master, it does not use the external (public) IP! Ognen On 9/7/2014 3:21 PM, Sean Owen wrote: Also keep in mind there is a non-trivial amount of traffic between the driver and cluster. It's not something I would do by default, running the driver so remotely. With enough ports open it should work though. On Sun, Sep 7, 2014 at 7:05 PM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: Horacio, Thanks, I have not tried that, however, I am not after security right now - I am just wondering why something so obvious won't work ;) Ognen On 9/7/2014 12:38 PM, Horacio G. de Oro wrote: Have you tryied with ssh? It will be much secure (only 1 port open), and you'll be able to run spark-shell over the networ. I'm using that way in my project (https://github.com/data-tsunami/smoke) with good results. I can't make a try now, but something like this should work: ssh -tt ec2-user@YOUR-EC2-IP /path/to/spark-shell SPARK-SHELL-OPTIONS With this approach you are way more secure (without installing a VPN), you don't need spark/hadoop installed on your PC. You won't have acces to local files, but you haven't mentioned that as a requirement :-) Hope this help you. Horacio -- Web: http://www.data-tsunami.com Email: hgde...@gmail.com Cel: +54 9 3572 525359 LinkedIn: https://www.linkedin.com/in/hgdeoro - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standalone spark cluster. Can't submit job programmatically - java.io.InvalidClassException
After wasting a lot of time, I've found the problem. Despite I haven't used hadoop/hdfs in my application, hadoop client matters. The problem was in hadoop-client version, it was different than the version of hadoop, spark was built for. Spark's hadoop version 1.2.1, but in my application that was 2.4. When I changed the version of hadoop client to 1.2.1 in my app, I'm able to execute spark code on cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-spark-cluster-Can-t-submit-job-programmatically-java-io-InvalidClassException-tp13456p13688.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark application in cluster mode doesn't run correctly
Hello, I tried to execute a simple spark application using sparkSQL. At first try, it worked as I exepcted but after then, it doesn't run and shows an stderr like below: Spark Executor Command: java -cp ::/opt/spark-1.0.2-bin-hadoop2/conf:/opt/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.4.0.jar:/opt/hadoop2/etc/hadoop:/opt/hadoop2/etc/hadoop -XX:MaxPermSize=128m -Xms14336M -Xmx14336M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@saturn00:35894/user/CoarseGrainedScheduler 9 saturn09 4 akka.tcp://sparkWorker@saturn09:45636/user/Worker app-20140908223656- 14/09/08 22:36:57 INFO spark.SecurityManager: Changing view acls to: root14/09/08 22:36:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root)14/09/08 22:36:57 INFO slf4j.Slf4jLogger: Slf4jLogger started14/09/08 22:36:57 INFO Remoting: Starting remoting14/09/08 22:36:57 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@saturn09:44260]14/09/08 22:36:57 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@saturn09:44260]14/09/08 22:36:57 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@saturn00:35894/user/CoarseGrainedScheduler14/09/08 22:36:57 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@saturn09:45636/user/Worker14/09/08 22:36:57 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@saturn09:45636/user/Worker14/09/08 22:36:57 INFO executor.CoarseGrainedExecutorBackend: Succe ssfully registered with driver14/09/08 22:36:57 INFO spark.SecurityManager: Changing view acls to: root14/09/08 22:36:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root)14/09/08 22:36:58 INFO slf4j.Slf4jLogger: Slf4jLogger started14/09/08 22:36:58 INFO Remoting: Starting remoting14/09/08 22:36:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@saturn09:39880]14/09/08 22:36:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@saturn09:39880]14/09/08 22:36:58 INFO spark.SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@saturn00:35894/user/MapOutputTracker14/09/08 22:36:58 INFO spark.SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@saturn00:35894/user/BlockManagerMaster14/09/08 22:36:58 INFO storage.DiskBlockManager: Created local directory at /hadoop/spark/spark-local-20140908223658-569914/09/08 22:36:58 INFO storage.MemoryStore: MemoryStore s tarted with capacity 4.0 GB.14/09/08 22:36:58 INFO network.ConnectionManager: Bound socket to port 49090 with id = ConnectionManagerId(saturn09,49090)14/09/08 22:36:58 INFO storage.BlockManagerMaster: Trying to register BlockManager14/09/08 22:36:58 INFO storage.BlockManagerMaster: Registered BlockManager14/09/08 22:36:58 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-379704ff-05f2-4c93-8814-ffbe1cc8cd5314/09/08 22:36:58 INFO spark.HttpServer: Starting HTTP Server14/09/08 22:36:58 INFO server.Server: jetty-8.y.z-SNAPSHOT14/09/08 22:36:58 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:40257 [akka.tcp://spark@saturn00:35894] disassociated! Shutting down. Here, saturn00 is a master and there are 10 nodes in my cluster (saturn01~saturn10) At the last message of the error, what is the meaning of Driver Disassociated? How can I resolve this issue? Thanks // Yoonmin Nam - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error while running sparkSQL application in the cluster-mode environment
Hello, I tried to execute a simple spark application using sparkSQL. At first try, it worked as I exepcted but after then, it doesn't run and shows an stderr like below: Spark Executor Command: java -cp ::/opt/spark-1.0.2-bin-hadoop2/conf:/opt/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.4.0.jar:/opt/hadoop2/etc/hadoop:/opt/hadoop2/etc/hadoop -XX:MaxPermSize=128m -Xms14336M -Xmx14336M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@saturn00:35894/user/CoarseGrainedScheduler 9 saturn09 4 akka.tcp://sparkWorker@saturn09:45636/user/Worker app-20140908223656- 14/09/08 22:36:57 INFO spark.SecurityManager: Changing view acls to: root 14/09/08 22:36:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/08 22:36:57 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/08 22:36:57 INFO Remoting: Starting remoting 14/09/08 22:36:57 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@saturn09:44260] 14/09/08 22:36:57 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@saturn09:44260] 14/09/08 22:36:57 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@saturn00:35894/user/CoarseGrainedScheduler 14/09/08 22:36:57 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@saturn09:45636/user/Worker 14/09/08 22:36:57 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@saturn09:45636/user/Wo rker 14/09/08 22:36:57 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 14/09/08 22:36:57 INFO spark.SecurityManager: Changing view acls to: root 14/09/08 22:36:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/08 22:36:58 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/08 22:36:58 INFO Remoting: Starting remoting 14/09/08 22:36:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@saturn09:39880] 14/09/08 22:36:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@saturn09:39880] 14/09/08 22:36:58 INFO spark.SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@saturn00:35894/user/MapOutputTracker 14/09/08 22:36:58 INFO spark.SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@saturn00:35894/user/BlockManagerMaster 14/09/08 22:36:58 INFO storage.DiskBlockManager: Created local directory at /hadoop/spark/spark-local-20140908223658-5699 14/09/08 22:36:58 INFO storage.MemoryStore: MemoryStore started with capacity 4.0 GB. 14/09/08 22:36:58 INFO network.ConnectionManager: Bound socket to port 49090 with id = ConnectionManagerId(saturn09,49090) 14/09/08 22:36:58 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/08 22:36:58 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/08 22:36:58 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-379704ff-05f2-4c93-8814-ffbe1cc8cd53 14/09/08 22:36:58 INFO spark.HttpServer: Starting HTTP Server 14/09/08 22:36:58 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/08 22:36:58 INFO server.AbstractConnector: Started a href=mailto:SocketConnector@0.0.0.0:40257;SocketConnector@0.0.0.0:40257 [akka.tcp://spark@saturn00:35894] disassociated! Shutting down. Here, saturn00 is a master and there are 10 nodes in my cluster (saturn01~saturn10) At the last message of the error, what is the meaning of Driver Disassociated? How can I resolve this issue? Thanks // Yoonmin Nam // Yoonmin Nam - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to scale large kafka topic
Hi I'm building a application the read from kafka stream event. In production we've 5 consumers that share 10 partitions. But on spark streaming kafka the master act as a consumer then distribute the tasks to workers so I can have only 1 masters acting as consumer but I need more because only 1 consumer means Lags. Do you've any idea what I can do ? Another point is interresting the master is not loaded at all I can get up more than 10 % CPU I've tried to increase the queued.max.message.chunks on the kafka client to read more records thinking it'll speed up the read but I only get ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId: SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] - PartitionFetchInfo(929838589,1048576),[IA2,6] - PartitionFetchInfo(929515796,1048576),[IA2,9] - PartitionFetchInfo(929577946,1048576),[IA2,8] - PartitionFetchInfo(930751599,1048576),[IA2,2] - PartitionFetchInfo(926457704,1048576),[IA2,5] - PartitionFetchInfo(930774385,1048576),[IA2,0] - PartitionFetchInfo(929913213,1048576),[IA2,3] - PartitionFetchInfo(929268891,1048576),[IA2,4] - PartitionFetchInfo(929949877,1048576),[IA2,1] - PartitionFetchInfo(930063114,1048576) java.lang.OutOfMemoryError: Java heap space Is someone have ideas ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-large-kafka-topic-tp13691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Solving Systems of Linear Equations Using Spark?
Durin, I have integrated ecos with spark which uses suitesparse under the hood for linear equation solvesI have exposed only the qp solver api in spark since I was comparing ip with proximal algorithms but we can expose suitesparse api as well...jni is used to load up ldl amd and ecos libraries. Please follow ecos section of my spark summit talk. We can discuss more but we can formulate interesting things like google's ceres solver's trust region formulation. http://spark-summit.org/2014/talk/quadratic-programing-solver-for-non-negative-matrix-factorization-with-spark Let me point you to the code so that you can take a look at it. Suitesparse (ldl and amd) is lgpl but ecos is gpl and therefore I was not sure how straightforward it will be to add the solver to mllib. Our legal was not convinced to add lgpl/gpl code in apache project. Could you also detail the usecases you are looking for ? You want a distributed lp / socp solver where each worker solves a partition of the constraint and the full objective...and you want to converge to a global solution using consensus ? Or your problem has more structure to partition the problem cleanly and don't need consensus step (which is what I implemented in the code) Thanks Deb On Sep 7, 2014 11:35 PM, Xiangrui Meng men...@gmail.com wrote: You can try LinearRegression with sparse input. It converges the least squares solution if the linear system is over-determined, while the convergence rate depends on the condition number. Applying standard scaling is popular heuristic to reduce the condition number. If you are interested in sparse direct methods as in SuiteSparse. I'm not aware of any effort to do so. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Solving Systems of Linear Equations Using Spark?
Xiangrui, Should I open up a JIRA for this ? Distributed lp/socp solver through ecos/ldl/amd ? I can open source it with gpl license in spark code as that's what our legal cleared (apache + gpl becomes gpl) and figure out the right way to call it...ecos is gpl but we can definitely use the jni version of ldl and amd which are lgpl... Let me know. Thanks. Deb On Sep 8, 2014 7:04 AM, Debasish Das debasish.da...@gmail.com wrote: Durin, I have integrated ecos with spark which uses suitesparse under the hood for linear equation solvesI have exposed only the qp solver api in spark since I was comparing ip with proximal algorithms but we can expose suitesparse api as well...jni is used to load up ldl amd and ecos libraries. Please follow ecos section of my spark summit talk. We can discuss more but we can formulate interesting things like google's ceres solver's trust region formulation. http://spark-summit.org/2014/talk/quadratic-programing-solver-for-non-negative-matrix-factorization-with-spark Let me point you to the code so that you can take a look at it. Suitesparse (ldl and amd) is lgpl but ecos is gpl and therefore I was not sure how straightforward it will be to add the solver to mllib. Our legal was not convinced to add lgpl/gpl code in apache project. Could you also detail the usecases you are looking for ? You want a distributed lp / socp solver where each worker solves a partition of the constraint and the full objective...and you want to converge to a global solution using consensus ? Or your problem has more structure to partition the problem cleanly and don't need consensus step (which is what I implemented in the code) Thanks Deb On Sep 7, 2014 11:35 PM, Xiangrui Meng men...@gmail.com wrote: You can try LinearRegression with sparse input. It converges the least squares solution if the linear system is over-determined, while the convergence rate depends on the condition number. Applying standard scaling is popular heuristic to reduce the condition number. If you are interested in sparse direct methods as in SuiteSparse. I'm not aware of any effort to do so. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cannot run SimpleApp as regular Java app
Dear all: I am a brand new Spark user trying out the SimpleApp from the Quick Start page. Here is the code: object SimpleApp { def main(args: Array[String]) { val logFile = /dev/spark-1.0.2-bin-hadoop2/README.md // Should be some file on your system val conf = new SparkConf() .setAppName(Simple Application) .set(spark.executor.memory, 512m) .setMaster(spark://myhost.local:7077) .setJars(Seq(/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar)) val sc = new SparkContext(conf) try { val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } finally { sc.stop() } } } I am using Spark 1.0.2 and Scala 2.10.4. In spark-env.sh I have SPARK_WORKER_MEMORY=2g. I am trying to run this as a standalone Java app in my IDE. Note that this code *does* work when I either - Change the master to local (works running from IDE) - Run it using spark-submit The application/driver log is: 14/09/08 10:03:55 INFO spark.SecurityManager: Changing view acls to: eric 14/09/08 10:03:55 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(eric) 14/09/08 10:03:56 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/08 10:03:56 INFO Remoting: Starting remoting 14/09/08 10:03:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.0.1.5:61645] 14/09/08 10:03:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.0.1.5:61645] 14/09/08 10:03:56 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/08 10:03:56 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/08 10:03:56 INFO storage.DiskBlockManager: Created local directory at /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-local-20140908100356-2496 14/09/08 10:03:56 INFO storage.MemoryStore: MemoryStore started with capacity 279.5 MB. 14/09/08 10:03:56 INFO network.ConnectionManager: Bound socket to port 61646 with id = ConnectionManagerId(10.0.1.5,61646) 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/08 10:03:56 INFO storage.BlockManagerInfo: Registering block manager 10.0.1.5:61646 with 279.5 MB RAM 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/08 10:03:56 INFO spark.HttpServer: Starting HTTP Server 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 14/09/08 10:03:57 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61647 14/09/08 10:03:57 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.0.1.5:61647 14/09/08 10:03:57 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-d5637279-5caa-4c14-a00f-650f1dd915bc 14/09/08 10:03:57 INFO spark.HttpServer: Starting HTTP Server 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 14/09/08 10:03:57 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:61648 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031 14/09/08 10:03:57 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/08 10:03:57 INFO ui.SparkUI: Started SparkUI at http://10.0.1.5:4040 2014-09-08 10:03:57.567 java[58736:1703] Unable to load realm info from SCDynamicStore 14/09/08 10:03:57 INFO spark.SparkContext: Added JAR /spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar at http://10.0.1.5:61648/jars/spark-experiments-1.0-SNAPSHOT.jar with timestamp 1410185037723 14/09/08 10:03:57 INFO client.AppClient$ClientActor: Connecting to master spark://myhost.local:7077... 14/09/08 10:03:57 INFO storage.MemoryStore: ensureFreeSpace(32960) called with curMem=0, maxMem=293063884 14/09/08 10:03:57 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.2 KB, free 279.5 MB) 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140908100358-0002 14/09/08 10:03:58 INFO client.AppClient$ClientActor: Executor added: app-20140908100358-0002/0 on worker-20140908100129-10.0.1.5-61526 (10.0.1.5:61526) with 8 cores 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140908100358-0002/0 on hostPort 10.0.1.5:61526 with 8 cores, 512.0 MB RAM 14/09/08 10:03:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/08 10:03:58 WARN snappy.LoadSnappy: Snappy native library not loaded 14/09/08 10:03:58 INFO mapred.FileInputFormat: Total input paths to process : 1 14/09/08 10:03:58 INFO client.AppClient$ClientActor: Executor updated: app-20140908100358-0002/0 is now RUNNING 14/09/08 10:03:58 INFO spark.SparkContext: Starting job: count at SimpleApp.scala:16 14/09/08 10:03:58 INFO
Spark SQL on Cassandra
Hi , I am reading data from Cassandra through datastax spark-cassandra connector converting it into JSON and then running spark-sql on it. Refer to the code snippet below : step 1 val o_rdd = sc.cassandraTable[CassandraRDDWrapper]( 'keyspace', 'column_family') step 2 val tempObjectRDD = sc.parallelize(o_rdd.toArray.map(i=i), 100) step 3 val objectRDD = sqlContext.jsonRDD(tempObjectRDD) step 4 objectRDD .registerAsTable(objects) At step (2) I have to explicitly do a toArray because jsonRDD takes in a RDD[String]. For me calling toArray on cassandra rdd takes forever as have million records in cassandra . Is there a better way of doing this ? How can I optimize it ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-on-Cassandra-tp13696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
A problem for running MLLIB in amazon clound
I am running a very simple example using the SVMWithSGD on Amazon EMR. I haven't got any result after one hour long. My instance-type is: m3.large instance-count is: 3 Dataset is the data provided by the MLLIB in apache: sample_svm_data The number of iteration is: 2 and all other options are defaulted value with the number of iterations equal to 2. Is there anyone who can help me out with this? Thanks, Hui
groupBy gives non deterministic results
Hi, I have a key-value RDD called rdd below. After a groupBy, I tried to count rows. But the result is not unique, somehow non deterministic. Here is the test code: val step1 = ligneReceipt_cleTable.persist val step2 = step1.groupByKey val s1size = step1.count val s2size = step2.count val t = step2 // rdd after groupBy val t1 = t.count val t2 = t.count val t3 = t.count val t4 = t.count val t5 = t.count val t6 = t.count val t7 = t.count val t8 = t.count println(s1size = + s1size) println(s2size = + s2size) println(1 = + t1) println(2 = + t2) println(3 = + t3) println(4 = + t4) println(5 = + t5) println(6 = + t6) println(7 = + t7) println(8 = + t8) Here are the results: s1size = 5338864 s2size = 5268001 1 = 5268002 2 = 5268001 3 = 5268001 4 = 5268002 5 = 5268001 6 = 5268002 7 = 5268002 8 = 5268001 Even if the difference is just one row, that's annoying. Any idea ? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to profile a spark application
Thank you Ted. regards Karthik On Mon, Sep 8, 2014 at 3:33 PM, Ted Yu yuzhih...@gmail.com wrote: See https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit On Sep 8, 2014, at 2:48 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, Can someone tell me how to profile a spark application. -Karthik
Re: distcp on ec2 standalone spark cluster
Tomer, Did you try start-all.sh? It worked for me the last time I tried using distcp, and it worked for this guy too http://stackoverflow.com/a/18083790/877069. Nick On Mon, Sep 8, 2014 at 3:28 AM, Tomer Benyamini tomer@gmail.com wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distcp on ec2 standalone spark cluster
Still no luck, even when running stop-all.sh followed by start-all.sh. On Mon, Sep 8, 2014 at 5:57 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Tomer, Did you try start-all.sh? It worked for me the last time I tried using distcp, and it worked for this guy too. Nick On Mon, Sep 8, 2014 at 3:28 AM, Tomer Benyamini tomer@gmail.com wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy gives non deterministic results
Update: Just test with HashPartitioner(8) and count on each partition: List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657591*), (*6,658327*), (*7,658434*)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657594)*, (6,658326), (*7,658434*)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (*7,658435*)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657591)*, (6,658326), (7,658434)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (7,658435)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (7,658435)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657592)*, (6,658326), (7,658435)), List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394), *(5,657591)*, (6,658326), (7,658435)) The result is not identical for each execution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How do you perform blocking IO in apache spark job?
What if, when I traverse RDD, I need to calculate values in dataset by calling external (blocking) service? How do you think that could be achieved? val values: Future[RDD[Double]] = Future sequence tasks I've tried to create a list of Futures, but as RDD id not Traversable, Future.sequence is not suitable. I just wonder, if anyone had such a problem, and how did you solve it? What I'm trying to achieve is to get a parallelism on a single worker node, so I can call that external service 3000 times per second. Probably, there is another solution, more suitable for spark, like having multiple working nodes on single host. It's interesting to know, how do you cope with such a challenge? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-perform-blocking-IO-in-apache-spark-job-tp13704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you perform blocking IO in apache spark job?
Hi, I What does the external service provide? Data? Calculations? Can the service push data to you via Kafka and Spark streaming ? Can you fetch the necessary data beforehand from the service? The solution to your question depends on your answers. I would not recommend to connect to a blocking service during spark jobs execution. What do you do if a node crashes? Is order of service calls for you relevant? Best regards Le 8 sept. 2014 17:31, DrKhu khudyakov@gmail.com a écrit : What if, when I traverse RDD, I need to calculate values in dataset by calling external (blocking) service? How do you think that could be achieved? val values: Future[RDD[Double]] = Future sequence tasks I've tried to create a list of Futures, but as RDD id not Traversable, Future.sequence is not suitable. I just wonder, if anyone had such a problem, and how did you solve it? What I'm trying to achieve is to get a parallelism on a single worker node, so I can call that external service 3000 times per second. Probably, there is another solution, more suitable for spark, like having multiple working nodes on single host. It's interesting to know, how do you cope with such a challenge? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-perform-blocking-IO-in-apache-spark-job-tp13704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distcp on ec2 standalone spark cluster
what did you see in the log? was there anything related to mapreduce? can you log into your hdfs (data) node, use jps to list all java process and confirm whether there is a tasktracker process (or nodemanager) running with datanode process -- Ye Xianjin Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Monday, September 8, 2014 at 11:13 PM, Tomer Benyamini wrote: Still no luck, even when running stop-all.sh (http://stop-all.sh) followed by start-all.sh (http://start-all.sh). On Mon, Sep 8, 2014 at 5:57 PM, Nicholas Chammas nicholas.cham...@gmail.com (mailto:nicholas.cham...@gmail.com) wrote: Tomer, Did you try start-all.sh (http://start-all.sh)? It worked for me the last time I tried using distcp, and it worked for this guy too. Nick On Mon, Sep 8, 2014 at 3:28 AM, Tomer Benyamini tomer@gmail.com (mailto:tomer@gmail.com) wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh (http://start-mapred.sh) does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh (http://stop-dfs.sh) and ~/ephemeral-hdfs/sbin/start-dfs.sh (http://start-dfs.sh), but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name (http://mapreduce.framework.name) and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com (mailto:rosenvi...@gmail.com) wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh (http://start-mapred.sh). On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com (mailto:tomer@gmail.com) wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name (http://mapreduce.framework.name) and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: How do you perform blocking IO in apache spark job?
Hi, Jörn, first of all, thanks for you intent to help. This one external service is a native component, that is stateless and that performs the calculation based on the data I provide. The data is in RDD. That one component I have on each worker node and I would like to get as much parallelism as possible on a single worker node. Using scala future I can get it, at least as much threads, as my machine allows me. But how to do the same on spark? Is there a possibility to cal that native component on each worker in multiple threads? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-perform-blocking-IO-in-apache-spark-job-tp13704p13707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you perform blocking IO in apache spark job?
What is the driver-side Future for? Are you trying to make the remote Spark workers execute more requests to your service concurrently? it's not clear from your messages whether it's something like a web service, or just local native code. So the time spent in your processing -- whatever returns Double -- is mostly waiting for a blocking service to return? I assume the external service is not at capacity yet and can handle more concurrent requests, or else, there's no point in adding parallelism. First I'd figure out how many parallel requests the service can handle before starting to slow down; call it N. It won't help to make more than N requests in parallel. So first I'd make sure you really are not yet at that point. You can make more partitions with repartition(), to have at least N partitions. Then you want to make sure there are enough executors, with access to enough cores, to run N tasks concurrently on the cluster. That should maximize parallelism. You can indeed write remote functions that parallelize themselves with Future (not on the driver side) but I think ideally you get the parallelism from Spark, absent a reason not to. On Mon, Sep 8, 2014 at 4:30 PM, DrKhu khudyakov@gmail.com wrote: What if, when I traverse RDD, I need to calculate values in dataset by calling external (blocking) service? How do you think that could be achieved? val values: Future[RDD[Double]] = Future sequence tasks I've tried to create a list of Futures, but as RDD id not Traversable, Future.sequence is not suitable. I just wonder, if anyone had such a problem, and how did you solve it? What I'm trying to achieve is to get a parallelism on a single worker node, so I can call that external service 3000 times per second. Probably, there is another solution, more suitable for spark, like having multiple working nodes on single host. It's interesting to know, how do you cope with such a challenge? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-perform-blocking-IO-in-apache-spark-job-tp13704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distcp on ec2 standalone spark cluster
No tasktracker or nodemanager. This is what I see: On the master: org.apache.hadoop.yarn.server.resourcemanager.ResourceManager org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode org.apache.hadoop.hdfs.server.namenode.NameNode On the data node (slave): org.apache.hadoop.hdfs.server.datanode.DataNode On Mon, Sep 8, 2014 at 6:39 PM, Ye Xianjin advance...@gmail.com wrote: what did you see in the log? was there anything related to mapreduce? can you log into your hdfs (data) node, use jps to list all java process and confirm whether there is a tasktracker process (or nodemanager) running with datanode process -- Ye Xianjin Sent with Sparrow On Monday, September 8, 2014 at 11:13 PM, Tomer Benyamini wrote: Still no luck, even when running stop-all.sh followed by start-all.sh. On Mon, Sep 8, 2014 at 5:57 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Tomer, Did you try start-all.sh? It worked for me the last time I tried using distcp, and it worked for this guy too. Nick On Mon, Sep 8, 2014 at 3:28 AM, Tomer Benyamini tomer@gmail.com wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...
Hello friends: It was mentioned in another (Y.A.R.N.-centric) email thread that 'SPARK_JAR' was deprecated, and to use the 'spark.yarn.jar' property instead for YARN submission. For example: user$ pyspark [some-options] --driver-java-options spark.yarn.jar=hdfs://namenode:8020/path/to/spark-assembly-*.jar What is the equivalent property to use for the LOCAL MODE case? spark.jar? spark.local.jar? I searched for this, but can't find where the definitions for these exist (perhaps a pointer to that, too). :) For completeness/explicitness, I like to specify things like this on the CLI, even if there are default settings them. Thank you! didata //
Re: If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...
On Mon, Sep 8, 2014 at 9:35 AM, Dimension Data, LLC. subscripti...@didata.us wrote: user$ pyspark [some-options] --driver-java-options spark.yarn.jar=hdfs://namenode:8020/path/to/spark-assembly-*.jar This command line does not look correct. spark.yarn.jar is not a JVM command line option. You most probably need a -D before that. What is the equivalent property to use for the LOCAL MODE case? What do you mean local mode? --master local? A local file? That location is a URL, so set it to what makes sense in your case. See also: http://spark.apache.org/docs/latest/submitting-applications.html (Under Advanced Dependency Management.) -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you perform blocking IO in apache spark job?
Thanks, Sean, I'll try to explain, what I'm trying to do. The native component, that I'm talking about is the native code, that I call using JNI. I've wrote small test Here, I traverse through the collection to call the native component N (1000) times. Then I have a result it means, that I'm able to get 10 req/sec by calling native component. And I would like to achieve the same result (not less) on a single node using spark. Then I've started 1 node cluster and runned next code on it: Here I've provided partitions = 1000, but the response time was not the same, but a lot more worse: Operation filtered.top(10)(Ordering.Double) is blocking, as I understand, at this time closure inside the map transformation starts to execute, calling native component is blocking there. If I could make it non-blocking, I would expect increase in performance. What do you think? How would you improve code? Or what spark configurations to look for? (Sorry, I'm quite new to Spark) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-perform-blocking-IO-in-apache-spark-job-tp13704p13713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow
I came across this: https://github.com/xerial/sbt-pack Until i found this, I was simply using the sbt-assembly plugin (sbt clean assembly) mn On Sep 4, 2014, at 2:46 PM, Aris arisofala...@gmail.com wrote: Thanks for answering Daniil - I have SBT version 0.13.5, is that an old version? Seems pretty up-to-date. It turns out I figured out a way around this entire problem: just use 'sbt package', and when using bin/spark-submit, pass it the --jars option and GIVE IT ALL THE JARS from the local iv2 cache. Pretty inelegant, but at least I am able to develop, and when I want to make a super JAR with sbt assembly I can use the stupidly slow method. Here is the important snippet for grabbing all the JARs for the local cache of ivy2 : --jars $(find ~/.ivy2/cache/ -iname *.jar | tr '\n' ,) Here's the entire running command - bin/spark-submit --master local[*] --jars $(find /home/data/.ivy2/cache/ -iname *.jar | tr '\n' ,) --class KafkaStreamConsumer ~/code_host/data/scala/streamingKafka/target/scala-2.10/streamingkafka_2.10-1.0.jar node1:2181 my-consumer-group aris-topic 1 This is fairly bad, but it works around sbt assembly being incredibly slow On Tue, Sep 2, 2014 at 2:13 PM, Daniil Osipov daniil.osi...@shazam.com wrote: What version of sbt are you using? There is a bug in early version of 0.13 that causes assembly to be extremely slow - make sure you're using the latest one. On Fri, Aug 29, 2014 at 1:30 PM, Aris wrote: Hi folks, I am trying to use Kafka with Spark Streaming, and it appears I cannot do the normal 'sbt package' as I do with other Spark applications, such as Spark alone or Spark with MLlib. I learned I have to build with the sbt-assembly plugin. OK, so here is my build.sbt file for my extremely simple test Kafka/Spark Streaming project. It Takes almost 30 minutes to build! This is a Centos Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To compare, sbt assembly for the entire Spark project itself takes less than 10 minutes. At the bottom of this file I am trying to play with 'cacheOutput' options, because I read online that maybe I am calculating SHA-1 for all the *.class files in this super JAR. I also copied the mergeStrategy from Spark contributor TD Spark Streaming tutorial from Spark Summit 2014. Again, is there some better way to build this JAR file, just using sbt package? This is process is working, but very slow. Any help with speeding up this compilation is really appreciated!! Aris - import AssemblyKeys._ // put this at the top of the file name := streamingKafka version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.1 % provided, org.apache.spark %% spark-streaming % 1.0.1 % provided, org.apache.spark %% spark-streaming-kafka % 1.0.1 ) assemblySettings jarName in assembly := streamingkafka-assembly.jar mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf= MergeStrategy.concat case _ = MergeStrategy.first } assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
Spark-submit ClassNotFoundException with JAR!
Hi, I'm having problems with a ClassNotFoundException using this simple example: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import java.net.URLClassLoader import scala.util.Marshal class ClassToRoundTrip(val id: Int) extends scala.Serializable { } object RoundTripTester { def test(id : Int) : ClassToRoundTrip = { // Get the current classpath and output. Can we see simpleapp jar? val cl = ClassLoader.getSystemClassLoader val urls = cl.asInstanceOf[URLClassLoader].getURLs urls.foreach(url = println(Executor classpath is: + url.getFile)) // Simply instantiating an instance of object and using it works fine. val testObj = new ClassToRoundTrip(id) println(testObj.id: + testObj.id) val testObjBytes = Marshal.dump(testObj) val testObjRoundTrip = Marshal.load[ClassToRoundTrip](testObjBytes) // -- ClassNotFoundException here testObjRoundTrip } } object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) val cl = ClassLoader.getSystemClassLoader val urls = cl.asInstanceOf[URLClassLoader].getURLs urls.foreach(url = println(Driver classpath is: + url.getFile)) val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) distData.foreach(x= RoundTripTester.test(x)) } } In local mode, submitting as per the docs generates a ClassNotFound exception on line 31, where the ClassToRoundTrip object is deserialized. Strangely, the earlier use on line 28 is okay: spark-submit --class SimpleApp \ --master local[4] \ target/scala-2.10/simpleapp_2.10-1.0.jar However, if I add extra parameters for driver-class-path, and -jars, it works fine, on local. spark-submit --class SimpleApp \ --master local[4] \ --driver-class-path /home/xxx/workspace/SimpleApp/target/scala-2.10/simpleapp_2.10-1.0.jar \ --jars /home/xxx/workspace/SimpleApp/target/scala-2.10/SimpleApp.jar \ target/scala-2.10/simpleapp_2.10-1.0.jar However, submitting to a local dev master, still generates the same issue: spark-submit --class SimpleApp \ --master spark://localhost.localdomain:7077 \ --driver-class-path /home/xxx/workspace/SimpleApp/target/scala-2.10/simpleapp_2.10-1.0.jar \ --jars /home/xxx/workspace/SimpleApp/target/scala-2.10/simpleapp_2.10-1.0.jar \ target/scala-2.10/simpleapp_2.10-1.0.jar I can see from the output that the JAR file is being fetched by the executor. Logs for one of the executor's are here: stdout: http://pastebin.com/raw.php?i=DQvvGhKm stderr: http://pastebin.com/raw.php?i=MPZZVa0Q I'm using Spark 1.0.2. The ClassToRoundTrip is included in the JAR. I have a work around of copying the JAR to each of the machines and setting the spark.executor.extraClassPath parameter but I would rather not have to do that. This is such a simple case, I must be doing something obviously wrong. Can anyone help? Thanks Peter - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...
On Mon, Sep 8, 2014 at 10:00 AM, Dimension Data, LLC. subscripti...@didata.us wrote: user$ export MASTER=local[nn] # Run spark shell on LOCAL CPU threads. user$ pyspark [someOptions] --driver-java-options -Dspark.*XYZ*.jar=' /usr/lib/spark/assembly/lib/spark-assembly-*.jar' My question is, what to replace '*XYZ*' with in that case. Ah, I see. There's no equivalent to that option in non-yarn mode, because either there is no need (e.g. in local mode everything is in the same machine) or the cluster backend doesn't support the feature (e.g. Spark Master does not have a distributed file cache like Yarn, at least as far as I know). That option is just to tell Yarn to use the distributed cache for the spark jar instead of auto-detecting where the jar is (which would incur in uploading the spark jar to all involved NMs). -- Marcelo
RE: prepending jars to the driver class path for spark-submit on YARN
I don't understand what you mean. Can you be more specific? From: Victor Tso-Guillen v...@paxata.com Sent: Saturday, September 06, 2014 5:13 PM To: Penny Espinoza Cc: Spark Subject: Re: prepending jars to the driver class path for spark-submit on YARN I ran into the same issue. What I did was use maven shade plugin to shade my version of httpcomponents libraries into another package. On Fri, Sep 5, 2014 at 4:33 PM, Penny Espinoza pesp...@societyconsulting.commailto:pesp...@societyconsulting.com wrote: Hey - I'm struggling with some dependency issues with org.apache.httpcomponents httpcore and httpclient when using spark-submit with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster. I've seen several posts about this issue, but no resolution. The error message is this: Caused by: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332) at com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44) at com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20) ... 17 more The apache httpcomponents libraries include the method above as of version 4.2. The Spark 1.0.2 binaries seem to include version 4.1. I can get this to work in my driver program by adding exclusions to force use of 4.1, but then I get the error in tasks even when using the -jars option of the spark-submit command. How can I get both the driver program and the individual tasks in my spark-streaming job to use the same version of this library so my job will run all the way through? thanks p
Re: How do you perform blocking IO in apache spark job?
Hi, So the external service itself creates threads and blocks until they finished execution? In this case you should not do threading but include it via jni directly in spark - it will take care about threading for you. Vest regards Hi, Jörn, first of all, thanks for you intent to help. This one external service is a native component, that is stateless and that performs the calculation based on the data I provide. The data is in RDD. That one component I have on each worker node and I would like to get as much parallelism as possible on a single worker node. Using scala future I can get it, at least as much threads, as my machine allows me. But how to do the same on spark? Is there a possibility to cal that native component on each worker in multiple threads? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-perform-blocking-IO-in-apache-spark-job-tp13704p13707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distcp on ec2 standalone spark cluster
well, this means you didn't start a compute cluster. Most likely because the wrong value of mapreduce.jobtracker.address cause the slave node cannot start the node manager. ( I am not familiar with the ec2 script, so I don't know whether the slave node has node manager installed or not.) Can you check the slave node the hadoop daemon log to see whether you started the nodemanager but failed or there is no nodemanager to start? The log file location defaults to /var/log/hadoop-xxx if my memory is correct. Sent from my iPhone On 2014年9月9日, at 0:08, Tomer Benyamini tomer@gmail.com wrote: No tasktracker or nodemanager. This is what I see: On the master: org.apache.hadoop.yarn.server.resourcemanager.ResourceManager org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode org.apache.hadoop.hdfs.server.namenode.NameNode On the data node (slave): org.apache.hadoop.hdfs.server.datanode.DataNode On Mon, Sep 8, 2014 at 6:39 PM, Ye Xianjin advance...@gmail.com wrote: what did you see in the log? was there anything related to mapreduce? can you log into your hdfs (data) node, use jps to list all java process and confirm whether there is a tasktracker process (or nodemanager) running with datanode process -- Ye Xianjin Sent with Sparrow On Monday, September 8, 2014 at 11:13 PM, Tomer Benyamini wrote: Still no luck, even when running stop-all.sh followed by start-all.sh. On Mon, Sep 8, 2014 at 5:57 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Tomer, Did you try start-all.sh? It worked for me the last time I tried using distcp, and it worked for this guy too. Nick On Mon, Sep 8, 2014 at 3:28 AM, Tomer Benyamini tomer@gmail.com wrote: ~/ephemeral-hdfs/sbin/start-mapred.sh does not exist on spark-1.0.2; I restarted hdfs using ~/ephemeral-hdfs/sbin/stop-dfs.sh and ~/ephemeral-hdfs/sbin/start-dfs.sh, but still getting the same error when trying to run distcp: ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) Any idea? Thanks! Tomer On Sun, Sep 7, 2014 at 9:27 PM, Josh Rosen rosenvi...@gmail.com wrote: If I recall, you should be able to start Hadoop MapReduce using ~/ephemeral-hdfs/sbin/start-mapred.sh. On Sun, Sep 7, 2014 at 6:42 AM, Tomer Benyamini tomer@gmail.com wrote: Hi, I would like to copy log files from s3 to the cluster's ephemeral-hdfs. I tried to use distcp, but I guess mapred is not running on the cluster - I'm getting the exception below. Is there a way to activate it, or is there a spark alternative to distcp? Thanks, Tomer mapreduce.Cluster (Cluster.java:initialize(114)) - Failed to use org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid mapreduce.jobtracker.address configuration value for LocalJobRunner : XXX:9001 ERROR tools.DistCp (DistCp.java:run(126)) - Exception encountered java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:83) at org.apache.hadoop.mapreduce.Cluster.init(Cluster.java:76) at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352) at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146) at org.apache.hadoop.tools.DistCp.run(DistCp.java:118) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.tools.DistCp.main(DistCp.java:374) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: prepending jars to the driver class path for spark-submit on YARN
When you submit the job to yarn with spark-submit, set --conf spark.yarn.user.classpath.first=true . On Mon, Sep 8, 2014 at 10:46 AM, Penny Espinoza pesp...@societyconsulting.com wrote: I don't understand what you mean. Can you be more specific? From: Victor Tso-Guillen v...@paxata.com Sent: Saturday, September 06, 2014 5:13 PM To: Penny Espinoza Cc: Spark Subject: Re: prepending jars to the driver class path for spark-submit on YARN I ran into the same issue. What I did was use maven shade plugin to shade my version of httpcomponents libraries into another package. On Fri, Sep 5, 2014 at 4:33 PM, Penny Espinoza pesp...@societyconsulting.com wrote: Hey - I’m struggling with some dependency issues with org.apache.httpcomponents httpcore and httpclient when using spark-submit with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster. I’ve seen several posts about this issue, but no resolution. The error message is this: Caused by: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332) at com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44) at com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20) ... 17 more The apache httpcomponents libraries include the method above as of version 4.2. The Spark 1.0.2 binaries seem to include version 4.1. I can get this to work in my driver program by adding exclusions to force use of 4.1, but then I get the error in tasks even when using the —jars option of the spark-submit command. How can I get both the driver program and the individual tasks in my spark-streaming job to use the same version of this library so my job will run all the way through? thanks p - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: A problem for running MLLIB in amazon clound
Could you attach the driver log? -Xiangrui On Mon, Sep 8, 2014 at 7:23 AM, Hui Li hli161...@gmail.com wrote: I am running a very simple example using the SVMWithSGD on Amazon EMR. I haven't got any result after one hour long. My instance-type is: m3.large instance-count is: 3 Dataset is the data provided by the MLLIB in apache: sample_svm_data The number of iteration is: 2 and all other options are defaulted value with the number of iterations equal to 2. Is there anyone who can help me out with this? Thanks, Hui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: prepending jars to the driver class path for spark-submit on YARN
I have tried using the spark.files.userClassPathFirst option (which, incidentally, is documented now, but marked as experimental), but it just causes different errors. I am using spark-streaming-kafka. If I mark spark-core and spark-streaming as provided and also exclude them from the spark-streaming-kafka dependency, I get this error: 14/09/08 18:34:23 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassCastException java.lang.ClassCastException: cannot assign instance of com.oncue.rna.realtime.streaming.spark.BaseKafkaExtractorJob$$anonfun$getEventsStream$1 to fie ld org.apache.spark.rdd.MappedRDD.f of type scala.Function1 in instance of org.apache.spark.rdd.MappedRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Re: Crawler and Scraper with different priorities
Depending on what you want to do with the result of the scraping, Spark may not be the best framework for your use case. Take a look at a general Akka application. On Sun, Sep 7, 2014 at 12:15 AM, Sandeep Singh sand...@techaddict.me wrote: Hi all, I am Implementing a Crawler, Scraper. The It should be able to process the request for crawling scraping, within few seconds of submitting the job(around 1mil/sec), for rest I can take some time(scheduled evenly all over the day). What is the best way to implement this? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-and-Scraper-with-different-priorities-tp13645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Input Field in Spark 1.1 Web UI
Is there more information on what the Input column on the Spark UI means? How is this computed? I am processing a fairly small (but zipped) file and see the value as [image: Inline image 1] This does not seem correct? Thanks, Arun
Re: Solving Systems of Linear Equations Using Spark?
I asked Tim whether he would change the license of SuiteSparse to an Apache-friendly license couple months ago, but the answer was no. So I don't think we can use SuiteSparse in MLlib through JNI. Please feel free to create JIRAs for distributed linear programming and SOCP solvers and run the discussion there. I'm very interested since I don't really know how to do linear programming in a distributed way. -Xiangrui On Mon, Sep 8, 2014 at 7:12 AM, Debasish Das debasish.da...@gmail.com wrote: Xiangrui, Should I open up a JIRA for this ? Distributed lp/socp solver through ecos/ldl/amd ? I can open source it with gpl license in spark code as that's what our legal cleared (apache + gpl becomes gpl) and figure out the right way to call it...ecos is gpl but we can definitely use the jni version of ldl and amd which are lgpl... Let me know. Thanks. Deb On Sep 8, 2014 7:04 AM, Debasish Das debasish.da...@gmail.com wrote: Durin, I have integrated ecos with spark which uses suitesparse under the hood for linear equation solvesI have exposed only the qp solver api in spark since I was comparing ip with proximal algorithms but we can expose suitesparse api as well...jni is used to load up ldl amd and ecos libraries. Please follow ecos section of my spark summit talk. We can discuss more but we can formulate interesting things like google's ceres solver's trust region formulation. http://spark-summit.org/2014/talk/quadratic-programing-solver-for-non-negative-matrix-factorization-with-spark Let me point you to the code so that you can take a look at it. Suitesparse (ldl and amd) is lgpl but ecos is gpl and therefore I was not sure how straightforward it will be to add the solver to mllib. Our legal was not convinced to add lgpl/gpl code in apache project. Could you also detail the usecases you are looking for ? You want a distributed lp / socp solver where each worker solves a partition of the constraint and the full objective...and you want to converge to a global solution using consensus ? Or your problem has more structure to partition the problem cleanly and don't need consensus step (which is what I implemented in the code) Thanks Deb On Sep 7, 2014 11:35 PM, Xiangrui Meng men...@gmail.com wrote: You can try LinearRegression with sparse input. It converges the least squares solution if the linear system is over-determined, while the convergence rate depends on the condition number. Applying standard scaling is popular heuristic to reduce the condition number. If you are interested in sparse direct methods as in SuiteSparse. I'm not aware of any effort to do so. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: prepending jars to the driver class path for spark-submit on YARN
?VIctor - Not sure what you mean. Can you provide more detail about what you did? From: Victor Tso-Guillen v...@paxata.com Sent: Saturday, September 06, 2014 5:13 PM To: Penny Espinoza Cc: Spark Subject: Re: prepending jars to the driver class path for spark-submit on YARN I ran into the same issue. What I did was use maven shade plugin to shade my version of httpcomponents libraries into another package. On Fri, Sep 5, 2014 at 4:33 PM, Penny Espinoza pesp...@societyconsulting.commailto:pesp...@societyconsulting.com wrote: Hey - I'm struggling with some dependency issues with org.apache.httpcomponents httpcore and httpclient when using spark-submit with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster. I've seen several posts about this issue, but no resolution. The error message is this: Caused by: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332) at com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44) at com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20) ... 17 more The apache httpcomponents libraries include the method above as of version 4.2. The Spark 1.0.2 binaries seem to include version 4.1. I can get this to work in my driver program by adding exclusions to force use of 4.1, but then I get the error in tasks even when using the -jars option of the spark-submit command. How can I get both the driver program and the individual tasks in my spark-streaming job to use the same version of this library so my job will run all the way through? thanks p
saveAsHadoopFile into avro format
What is the right way of saving any PairRDD into avro output format. GraphArray extends SpecificRecord etc. I have the following java rdd: JavaPairRDDGraphArray, NullWritable pairRDD = ... and want to save it to avro format: org.apache.hadoop.mapred.JobConf jc = new org.apache.hadoop.mapred.JobConf(); org.apache.avro.mapred.AvroJob.setOutputSchema(jc, GraphArray.getClassSchema()); org.apache.avro.mapred.AvroOutputFormat.setOutputPath(jc, new Path(outURI)); pairRDD.saveAsHadoopDataset(jc); the code above throws: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.io.NullWritable I also tried wrapping key and values with AvroKey and AvroValue classes respectively. What am I doing wrong? Should I use JavaRDD (list) instead and try with custom serializer? Thanks, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...
On Mon, Sep 8, 2014 at 11:52 AM, Dimension Data, LLC. subscripti...@didata.us wrote: So just to clarify for me: When specifying 'spark.yarn.jar' as I did above, even if I don't use HDFS to create a RDD (e.g. do something simple like: 'sc.parallelize(range(100))'), it is still necessary to configure the HDFS location in each NM's '/etc/hadoop/conf/*', just so that they can access the Spark Jar in the YARN case? That's correct. In fact, I'm not aware of Yarn working at all without the HDFS configuration being in place (even if the default fs is not HDFS), but then I'm not a Yarn deployment expert. -- Marcelo
Re: Solving Systems of Linear Equations Using Spark?
Yup...this can be a spark community project...I saw a PR for that...interested users fine with lgpl/gpl code can make use of it... On Mon, Sep 8, 2014 at 12:37 PM, Xiangrui Meng men...@gmail.com wrote: I asked Tim whether he would change the license of SuiteSparse to an Apache-friendly license couple months ago, but the answer was no. So I don't think we can use SuiteSparse in MLlib through JNI. Please feel free to create JIRAs for distributed linear programming and SOCP solvers and run the discussion there. I'm very interested since I don't really know how to do linear programming in a distributed way. -Xiangrui On Mon, Sep 8, 2014 at 7:12 AM, Debasish Das debasish.da...@gmail.com wrote: Xiangrui, Should I open up a JIRA for this ? Distributed lp/socp solver through ecos/ldl/amd ? I can open source it with gpl license in spark code as that's what our legal cleared (apache + gpl becomes gpl) and figure out the right way to call it...ecos is gpl but we can definitely use the jni version of ldl and amd which are lgpl... Let me know. Thanks. Deb On Sep 8, 2014 7:04 AM, Debasish Das debasish.da...@gmail.com wrote: Durin, I have integrated ecos with spark which uses suitesparse under the hood for linear equation solvesI have exposed only the qp solver api in spark since I was comparing ip with proximal algorithms but we can expose suitesparse api as well...jni is used to load up ldl amd and ecos libraries. Please follow ecos section of my spark summit talk. We can discuss more but we can formulate interesting things like google's ceres solver's trust region formulation. http://spark-summit.org/2014/talk/quadratic-programing-solver-for-non-negative-matrix-factorization-with-spark Let me point you to the code so that you can take a look at it. Suitesparse (ldl and amd) is lgpl but ecos is gpl and therefore I was not sure how straightforward it will be to add the solver to mllib. Our legal was not convinced to add lgpl/gpl code in apache project. Could you also detail the usecases you are looking for ? You want a distributed lp / socp solver where each worker solves a partition of the constraint and the full objective...and you want to converge to a global solution using consensus ? Or your problem has more structure to partition the problem cleanly and don't need consensus step (which is what I implemented in the code) Thanks Deb On Sep 7, 2014 11:35 PM, Xiangrui Meng men...@gmail.com wrote: You can try LinearRegression with sparse input. It converges the least squares solution if the linear system is over-determined, while the convergence rate depends on the condition number. Applying standard scaling is popular heuristic to reduce the condition number. If you are interested in sparse direct methods as in SuiteSparse. I'm not aware of any effort to do so. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Records - Input Byte
Hi, I was reading the paper of Spark Streaming: Discretized Streams: Fault-Tolerant Streaming Computation at Scale So, I read that performance evaluation used 100-byte input records in test Grep and WordCount. I don't have much experience and I'd like to know how can I control this value in my records (like words in an input file)? Can anyone suggest me something to start? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Records-Input-Byte-tp13733.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Recommendations for performance
Hi, Let me start with, I am new to spark.(be gentle) I have a large data set in Parquet (~1.5B rows, 900 columns) Currently Impala takes ~1-2 seconds for the queries while SparkSQL is taking ~30 seconds.. Here is what I am currently doing.. I launch with SPARK_MEM=6g spark-shell val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val parquetFile = sqlContext.parquetFile(hdfs:///user/ubuntu/parquet_data) parquetFile.cache() parquetFile.registerAsTable(parquet_table) val test1 = sqlContext.sql(select COUNT(1) FROM parquet_table WHERE 1=1 AND ) test1.take(1) My cluster has ~300Gigs of memory is 5 X c3.8xlarge nodes I am guessing not all the data is fitting in memory.. in this config.. 1) How do I determine how much memory the data will need in memory.. 2) How do I tell spark to load the data in memory and keep it there..? 3) When I go to host:4040/storage/ I do not see anything there thanks, Manu
Re: Spark SQL check if query is completed (pyspark)
You are probably not getting an error because the exception is happening inside of Hive. I'd still consider this a bug if you'd like to open a JIRA. On Mon, Sep 8, 2014 at 3:02 AM, jamborta jambo...@gmail.com wrote: thank you for the replies. I am running an insert on a join (INSERT OVERWRITE TABLE new_table select * from table1 as a join table2 as b on (a.key = b.key), The process does not have the right permission to write to that folder, so I get the following error printed: chgrp: `/user/x/y': No such file or directory chmod: `/user/x/y': No such file or directory and it returns an empty RDD without getting an exception. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-check-if-query-is-completed-pyspark-tp13630p13685.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Querying a parquet file in s3 with an ec2 install
Hello all, I've been wrestling with this problem all day and any suggestions would be greatly appreciated. I'm trying to test reading a parquet file that's stored in s3 using a spark cluster deployed on ec2. The following works in the spark shell when run completely locally on my own machine (i.e. no --master option passed to the spark-shell command): val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val p = parquetFile(s3n://[bucket]/path-to-parquet-dir/) p.registerAsTable(s) sql(select count(*) from s).collect I have an ec2 deployment of spark (tried version 1.0.2 and 1.1.0-rc4) using the standalone cluster manager and deployed with the spark-ec2 script. Running the same code in a spark shell connected to the cluster it basically hangs on the select statement. The workers/slaves simply time out and restart every 30 seconds when they hit what appears to be an activity timeout, as if there's no activity from the spark-shell (based on what I see in the stderr logs for the job, I assume this is expected behavior when connected from a spark-shell that's sitting idle). I see these messages about every 30 seconds: 14/09/08 17:43:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/7 is now EXITED (Command exited with code 1) 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/7 removed: Command exited with code 1 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor added: app-20140908213842-0002/8 on worker-20140908183422-ip-10-60-107-194.ec2.internal-53445 (ip-10-60-107-194.ec2.internal:53445) with 2 cores 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140908213842-0002/8 on hostPort ip-10-60-107-194.ec2.internal:53445 with 2 cores, 4.0 GB RAM 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/8 is now RUNNING Eventually it fails with a: 14/09/08 17:44:16 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/9 is now EXITED (Command exited with code 1) 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/9 removed: Command exited with code 1 14/09/08 17:44:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/09/08 17:44:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/09/08 17:44:16 INFO TaskSchedulerImpl: Cancelling stage 1 14/09/08 17:44:16 INFO DAGScheduler: Failed to run collect at SparkPlan.scala:85 14/09/08 17:44:16 INFO SparkUI: Stopped Spark web UI at http://192.168.10.198:4040 14/09/08 17:44:16 INFO DAGScheduler: Stopping DAGScheduler 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
Re: Spark SQL on Cassandra
I believe DataStax is working on better integration here, but until that is ready you can use the applySchema API. Basically you will convert the CassandraTable into and RDD of Row objects using a .map() and then you can call applySchema (provided by SQLContext) to get a SchemaRDD. More details will be available in the SQL Programming Guide for 1.1 (which will hopefully be published in the next day or two). You can see the raw version here: https://raw.githubusercontent.com/apache/spark/master/docs/sql-programming-guide.md Look for section: Programmatically Specifying the Schema On Mon, Sep 8, 2014 at 7:22 AM, gtinside gtins...@gmail.com wrote: Hi , I am reading data from Cassandra through datastax spark-cassandra connector converting it into JSON and then running spark-sql on it. Refer to the code snippet below : step 1 val o_rdd = sc.cassandraTable[CassandraRDDWrapper]( 'keyspace', 'column_family') step 2 val tempObjectRDD = sc.parallelize(o_rdd.toArray.map(i=i), 100) step 3 val objectRDD = sqlContext.jsonRDD(tempObjectRDD) step 4 objectRDD .registerAsTable(objects) At step (2) I have to explicitly do a toArray because jsonRDD takes in a RDD[String]. For me calling toArray on cassandra rdd takes forever as have million records in cassandra . Is there a better way of doing this ? How can I optimize it ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-on-Cassandra-tp13696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Querying a parquet file in s3 with an ec2 install
How big is the data set? Does it work when you copy it to hdfs? -Manu On Mon, Sep 8, 2014 at 2:58 PM, Jim Carroll jimfcarr...@gmail.com wrote: Hello all, I've been wrestling with this problem all day and any suggestions would be greatly appreciated. I'm trying to test reading a parquet file that's stored in s3 using a spark cluster deployed on ec2. The following works in the spark shell when run completely locally on my own machine (i.e. no --master option passed to the spark-shell command): val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val p = parquetFile(s3n://[bucket]/path-to-parquet-dir/) p.registerAsTable(s) sql(select count(*) from s).collect I have an ec2 deployment of spark (tried version 1.0.2 and 1.1.0-rc4) using the standalone cluster manager and deployed with the spark-ec2 script. Running the same code in a spark shell connected to the cluster it basically hangs on the select statement. The workers/slaves simply time out and restart every 30 seconds when they hit what appears to be an activity timeout, as if there's no activity from the spark-shell (based on what I see in the stderr logs for the job, I assume this is expected behavior when connected from a spark-shell that's sitting idle). I see these messages about every 30 seconds: 14/09/08 17:43:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/7 is now EXITED (Command exited with code 1) 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/7 removed: Command exited with code 1 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor added: app-20140908213842-0002/8 on worker-20140908183422-ip-10-60-107-194.ec2.internal-53445 (ip-10-60-107-194.ec2.internal:53445) with 2 cores 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140908213842-0002/8 on hostPort ip-10-60-107-194.ec2.internal:53445 with 2 cores, 4.0 GB RAM 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/8 is now RUNNING Eventually it fails with a: 14/09/08 17:44:16 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/9 is now EXITED (Command exited with code 1) 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/9 removed: Command exited with code 1 14/09/08 17:44:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/09/08 17:44:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/09/08 17:44:16 INFO TaskSchedulerImpl: Cancelling stage 1 14/09/08 17:44:16 INFO DAGScheduler: Failed to run collect at SparkPlan.scala:85 14/09/08 17:44:16 INFO SparkUI: Stopped Spark web UI at http://192.168.10.198:4040 14/09/08 17:44:16 INFO DAGScheduler: Stopping DAGScheduler 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at
RE: SchemaRDD - Parquet - insertInto makes many files
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark#HiveonSpark-NumberofTasks it will be great, if something like hive.exec.reducers.bytes.per.reducer could be implemented. one idea is, get total size of all target blocks, then set number of partitions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-Parquet-insertInto-makes-many-files-tp13480p13740.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Thanks TD. Someone already pointed out to me that /repartition(...)/ isn't the right way. You have to /val partedStream = repartition(...)/. Would be nice to have it fixed in the docs. On Fri, Sep 5, 2014 at 10:44 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Some thoughts on this thread to clarify the doubts. 1. Driver recovery: The current (1.1 to be released) does not recover the raw data that has been received but not processes. This is because when the driver dies, the executors die and so does the raw data that was stored in it. Only for HDFS, the data is not lost by driver recovery as the data is already present reliably in HDFS. This is something we want to fix by Spark 1.2 (3 month from now). Regarding recovery by replaying the data from Kafka, it is possible but tricky. Our goal is to provide strong guarantee, exactly-once semantics in all transformations. To guarantee this for all kinds of streaming computations stateful and not-stateful computations, it is requires that the data be replayed through Kafka in exactly same order, and the underlying blocks of data in Spark be regenerated in the exact way as it would have if there was no driver failure. This is quite tricky to implement, requires manipulation of zookeeper offsets, etc, that is hard to do with the high level consumer that KafkaUtil uses. Dibyendu's low level Kafka receiver may enable such approaches in the future. For now we definitely plan to solve the first problem very very soon. 3. Repartitioning: I am trying to understand the repartition issue. One common mistake I have seen is that developers repartition a stream but not use the repartitioned stream. WRONG: inputDstream.repartition(100) inputDstream.map(...).count().print() RIGHT: val repartitionedDStream = inputDStream.repartitoin(100) repartitionedDStream.map(...).count().print() Not sure if this helps solve the problem that you all the facing. I am going to add this to the stremaing programming guide to make sure this common mistake is avoided. TD On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Sorry for little delay . As discussed in this thread, I have modified the Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer) code to have dedicated Receiver for every Topic Partition. You can see the example howto create Union of these receivers in consumer.kafka.client.Consumer.java . Thanks to Chris for suggesting this change. Regards, Dibyendu On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context:
Re: Querying a parquet file in s3 with an ec2 install
Mmm how many days worth of data/how deep is your data nesting? I suspect your running into a current issue with parquet (a fix is in master but I don't believe released yet..). It reads all the metadata to the submitter node as part of scheduling the job. This can cause long start times(timeouts too), and also requires a lot of memory so hence the OOM with lower memory. The newer one reads the metadata per file on the task reading that file. At least the hadoop stack is designed to do that on the mappers. With how Spark works I expect the same improvement there. On Mon, Sep 8, 2014 at 3:33 PM, Manu Mukerji manu...@gmail.com wrote: How big is the data set? Does it work when you copy it to hdfs? -Manu On Mon, Sep 8, 2014 at 2:58 PM, Jim Carroll jimfcarr...@gmail.com wrote: Hello all, I've been wrestling with this problem all day and any suggestions would be greatly appreciated. I'm trying to test reading a parquet file that's stored in s3 using a spark cluster deployed on ec2. The following works in the spark shell when run completely locally on my own machine (i.e. no --master option passed to the spark-shell command): val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val p = parquetFile(s3n://[bucket]/path-to-parquet-dir/) p.registerAsTable(s) sql(select count(*) from s).collect I have an ec2 deployment of spark (tried version 1.0.2 and 1.1.0-rc4) using the standalone cluster manager and deployed with the spark-ec2 script. Running the same code in a spark shell connected to the cluster it basically hangs on the select statement. The workers/slaves simply time out and restart every 30 seconds when they hit what appears to be an activity timeout, as if there's no activity from the spark-shell (based on what I see in the stderr logs for the job, I assume this is expected behavior when connected from a spark-shell that's sitting idle). I see these messages about every 30 seconds: 14/09/08 17:43:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/7 is now EXITED (Command exited with code 1) 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/7 removed: Command exited with code 1 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor added: app-20140908213842-0002/8 on worker-20140908183422-ip-10-60-107-194.ec2.internal-53445 (ip-10-60-107-194.ec2.internal:53445) with 2 cores 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140908213842-0002/8 on hostPort ip-10-60-107-194.ec2.internal:53445 with 2 cores, 4.0 GB RAM 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/8 is now RUNNING Eventually it fails with a: 14/09/08 17:44:16 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/9 is now EXITED (Command exited with code 1) 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/9 removed: Command exited with code 1 14/09/08 17:44:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/09/08 17:44:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/09/08 17:44:16 INFO TaskSchedulerImpl: Cancelling stage 1 14/09/08 17:44:16 INFO DAGScheduler: Failed to run collect at SparkPlan.scala:85 14/09/08 17:44:16 INFO SparkUI: Stopped Spark web UI at http://192.168.10.198:4040 14/09/08 17:44:16 INFO DAGScheduler: Stopping DAGScheduler 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236)
[Spark Streaming] java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi guys, My Spark Streaming application have this java.lang.OutOfMemoryError: GC overhead limit exceeded error in SparkStreaming driver program. I have done the following to debug with it: 1. improved the driver memory from 1GB to 2GB, this error came after 22 hrs. When the memory was 1GB, it came after 10 hrs. So I think it is the memory leak problem. 2. after starting the application a few hours, I killed all workers. The driver program kept running and also filling up the memory. I was thinking it was because too many batches in the queue, obviously it is not. Otherwise, after killing workers (of course, the receiver), the memory usage should have gone down. 3. run the heap dump and Leak Suspect of Memory Analysis in Eclipse, found that *One instance of org.apache.spark.storage.BlockManager loaded by sun.misc.Launcher$AppClassLoader @ 0x6c002fb90 occupies 1,477,177,296 (72.70%) bytes. The memory is accumulated in one instance of java.util.LinkedHashMap loaded by system class loader.* *Keywords* *sun.misc.Launcher$AppClassLoader @ 0x6c002fb90**java.util.LinkedHashMap* *org.apache.spark.storage.BlockManager * What my application mainly does is : 1. calculate the sum/count in a batch 2. get the average in the batch 3. store the result in DB 4. calculate the sum/count in a window 5. get the average/min/max in the window 6. store the result in DB 7. compare the current batch value with previous batch value using updateStateByKey. Any hint what causes this leakage? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...
On Mon, Sep 8, 2014 at 3:54 PM, Dimension Data, LLC. subscripti...@didata.us wrote: You're probably right about the above because, as seen *below* for pyspark (but probably for other Spark applications too), once '-Dspark.master=[yarn-client|yarn-cluster]' is specified, the app invocation doesn't even seem to respect the property '-Dspark.yarn.jar=[file:/// | file:// |/...]' setting. That's too bad because I have CDH5 Spark installed If you're using CDH 5 (either 5.0 or 5.1) then spark.yarn.jar won't work. For CDH5 you want to set the SPARK_JAR environment variable instead. The change that added spark.yarn.jar is part of Spark 1.1 (which will be part of CDH 5.2). -- Marcelo
Spark Web UI in Mesos mode
Hi, I am running Spark 1.0.2 on a cluster in Mesos mode. I am not able to access the Spark master Web UI at port 8080 but am able to access it at port 5050. Is 5050 the standard port? Also, in the the standalone mode, there is a link to the Application detail UI directly from the master UI. I dont see that link in the master UI page in Mesos mode. Instead I have to explicitly go to port 18080 to access the application detail. I have set up the history server. Is there some way to access the application detail link in Mesos mode directly from the master UI page (like the standalone UI)? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Web-UI-in-Mesos-mode-tp13746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Web UI in Mesos mode
Hi, Spark master web UI is only for standalone clusters, where cluster resources are managed by Spark, not other resource managers. Mesos master's default port is 5050. Within Mesos, a Spark application is considered as one of many frameworks, so there's no Spark-specific support like accessing application web UI from Mesos master web UI. - Wonha On Mon, Sep 8, 2014 at 4:45 PM, SK skrishna...@gmail.com wrote: Hi, I am running Spark 1.0.2 on a cluster in Mesos mode. I am not able to access the Spark master Web UI at port 8080 but am able to access it at port 5050. Is 5050 the standard port? Also, in the the standalone mode, there is a link to the Application detail UI directly from the master UI. I dont see that link in the master UI page in Mesos mode. Instead I have to explicitly go to port 18080 to access the application detail. I have set up the history server. Is there some way to access the application detail link in Mesos mode directly from the master UI page (like the standalone UI)? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Web-UI-in-Mesos-mode-tp13746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executor address issue: CANNOT FIND ADDRESS (Spark 0.9.1)
Hi, One of the executors in my spark cluster shows a CANNOT FIND ADDRESS address, for one of the stages which failed. After that stages, I got cascading failures for all my stages :/ (stages that seem complete but still appears as active stage in the dashboard; incomplete or failed stages that are still in the active sections). Just a note that in the later stages, there were no more CANNOT FIND ADDRESS issues. Did anybody get this address issue and find a solution? Could this problem explain the cascading failures? Thanks! Nicolas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-address-issue-CANNOT-FIND-ADDRESS-Spark-0-9-1-tp13748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is the structure for a jar file for running Spark applications the same as that for Hadoop
In a Hadoop jar there is a directory called lib and all non-provided third party jars go there and are included in the class path of the code. Do jars for Spark have the same structure - another way to ask the question is if I have code to execute Spark and a jar build for Hadoop can I simply use that jar?
Re: Multi-tenancy for Spark (Streaming) Applications
Hi, On Thu, Sep 4, 2014 at 10:33 AM, Tathagata Das tathagata.das1...@gmail.com wrote: In the current state of Spark Streaming, creating separate Java processes each having a streaming context is probably the best approach to dynamically adding and removing of input sources. All of these should be able to to use a YARN cluster for resource allocation. So, for example, I would write a server application that accepts a command like createNewInstance and then calls spark-submit, pushing my actual application to the YARN cluster? Or could I use spark-jobserver? Thanks Tobias
Re: Spark streaming for synchronous API
Ron, On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: I’m trying to figure out how I can run Spark Streaming like an API. The goal is to have a synchronous REST API that runs the spark data flow on YARN. I guess I *may* develop something similar in the future. By a synchronous REST API, do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream? To begin with, is it even possible to have Spark Streaming run as a yarn job? I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos. Tobias
RE: Setting Kafka parameters in Spark Streaming
Hi Hemanth, I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 12:49 AM To: user@spark.apache.org Subject: Setting Kafka parameters in Spark Streaming Hi, I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following: kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel) I tried to call this as so: context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()) However, this is causing an exception like: java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154) Can anyone provide help on how to set these parameters ? Thanks Hemanth
Re: Spark streaming for synchronous API
Tobias, Let me explain a little more. I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Thanks, Ron On Sep 8, 2014, at 7:41 PM, Tobias Pfeiffer t...@preferred.jp wrote: Ron, On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: I’m trying to figure out how I can run Spark Streaming like an API. The goal is to have a synchronous REST API that runs the spark data flow on YARN. I guess I *may* develop something similar in the future. By a synchronous REST API, do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream? To begin with, is it even possible to have Spark Streaming run as a yarn job? I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos. Tobias
Re: Spark streaming for synchronous API
Tobias, Let me explain a little more. I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Thanks, Ron On Sep 8, 2014, at 7:41 PM, Tobias Pfeiffer t...@preferred.jp wrote: Ron, On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: I’m trying to figure out how I can run Spark Streaming like an API. The goal is to have a synchronous REST API that runs the spark data flow on YARN. I guess I *may* develop something similar in the future. By a synchronous REST API, do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream? To begin with, is it even possible to have Spark Streaming run as a yarn job? I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos. Tobias
Re: Spark streaming for synchronous API
Hi, On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote: I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. That is not exactly a Spark Streaming use case, I think. Spark Streaming pulls data from some source (like a queue), then processes all data collected in a certain interval in a mini-batch, and stores that data somewhere. It is not well suited for handling request-response cycles in a synchronous way; you might consider using plain Spark (without Streaming) for that. For example, you could use the unfiltered http://unfiltered.databinder.net/Unfiltered.html library and within request handling do some RDD operation, returning the output as HTTP response. This works fine as multiple threads can submit Spark jobs concurrently https://spark.apache.org/docs/latest/job-scheduling.html You could also check https://github.com/adobe-research/spindle -- that seems to be similar to what you are doing. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Note that low latency for lots of calls is maybe not something that Spark was built for. Even if you do close to nothing data processing, you may not get below 200ms or so due to the overhead of submitting jobs etc., from my experience. Tobias
Re: How to profile a spark application
hi Ted, Where do I find the licence keys that I need to copy to the licences directory. Thank you!! On Mon, Sep 8, 2014 at 8:25 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you Ted. regards Karthik On Mon, Sep 8, 2014 at 3:33 PM, Ted Yu yuzhih...@gmail.com wrote: See https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit On Sep 8, 2014, at 2:48 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, Can someone tell me how to profile a spark application. -Karthik
Re: Spark streaming for synchronous API
Hi Tobias, So I guess where I was coming from was the assumption that starting up a new job to be listening on a particular queue topic could be done asynchronously. For example, let’s say there’s a particular topic T1 in a Kafka queue. If I have a new set of requests coming from a particular client A, I was wondering if I could create a partition A. The streaming job is submitted to listen to T1.A and will write to a topic T2.A, which the REST endpoint would be listening on. It does seem a little contrived but the ultimate goal here is to get a bunch of messages from a queue, distribute to a bunch of Spark jobs that process and write back to another queue, which the REST endpoint synchronously waits on. Storm might be a better fit, but the background behind this question is that I want to reuse the same set of transformations for both batch and streaming, with the streaming use case represented by a REST call. In other words, the job submission would not be part of the equation so I would imagine the latency is limited to the processing, write back and consumption of the processed message by the original REST request. Let me know what you think… Thanks, Ron On Sep 8, 2014, at 9:28 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote: I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. That is not exactly a Spark Streaming use case, I think. Spark Streaming pulls data from some source (like a queue), then processes all data collected in a certain interval in a mini-batch, and stores that data somewhere. It is not well suited for handling request-response cycles in a synchronous way; you might consider using plain Spark (without Streaming) for that. For example, you could use the unfiltered http://unfiltered.databinder.net/Unfiltered.html library and within request handling do some RDD operation, returning the output as HTTP response. This works fine as multiple threads can submit Spark jobs concurrently https://spark.apache.org/docs/latest/job-scheduling.html You could also check https://github.com/adobe-research/spindle -- that seems to be similar to what you are doing. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Note that low latency for lots of calls is maybe not something that Spark was built for. Even if you do close to nothing data processing, you may not get below 200ms or so due to the overhead of submitting jobs etc., from my experience. Tobias
Re: Setting Kafka parameters in Spark Streaming
Thanks, Shao, for providing the necessary information. Hemanth On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi Hemanth, I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch ( https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field. Thanks Jerry *From:* Hemanth Yamijala [mailto:yhema...@gmail.com] *Sent:* Tuesday, September 09, 2014 12:49 AM *To:* user@spark.apache.org *Subject:* Setting Kafka parameters in Spark Streaming Hi, I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following: kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel) I tried to call this as so: context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()) However, this is causing an exception like: java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154) Can anyone provide help on how to set these parameters ? Thanks Hemanth
Re: Spark streaming for synchronous API
Hi, On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote: So I guess where I was coming from was the assumption that starting up a new job to be listening on a particular queue topic could be done asynchronously. No, with the current state of Spark Streaming, all data sources and the processing pipeline must be fixed when you start your StreamingContext. You cannot add new data sources dynamically at the moment, see http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-for-Spark-Streaming-Applications-td13398.html For example, let’s say there’s a particular topic T1 in a Kafka queue. If I have a new set of requests coming from a particular client A, I was wondering if I could create a partition A. The streaming job is submitted to listen to T1.A and will write to a topic T2.A, which the REST endpoint would be listening on. That doesn't seem like a good way to use Kafka. It may be possible, but I am pretty sure you should create a new topic T_A instead of a partition A in an existing topic. With some modifications of Spark Streaming's KafkaReceiver you *might* be able to get it to work as you imagine, but it was not meant to be that way, I think. Also, you will not get low latency, because Spark Streaming processes data in batches of fixed interval length (say, 1 second) and in the worst case your query will wait up to 1 second before processing even starts. If I understand correctly what you are trying to do (which I am not sure about), I would probably recommend to choose a bit of a different architecture; in particular given that you cannot dynamically add data sources. Tobias
Re: Crawler and Scraper with different priorities
Hi Daniil, I have to do some processing of the results, as well as pushing the data to the front end. Currently I'm using akka for this application, but I was thinking maybe spark streaming would be a better thing to do. as well as i can use mllib for processing the results. Any specific reason's why spark streaming won't be better than akka ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-Scraper-with-different-priorities-tp13645p13763.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Setting Kafka parameters in Spark Streaming
As you mentioned you hope to transplant latest version of Spark into Kafka 0.7 in another mail, there are some notes you should take care: 1. Kafka 0.7+ can only be compiled with Scala 2.8, while now Spark is compiled with Scala 2.10, there is no binary compatible between these two Scala versions. So you have to modify Kafka code as previously Spark did to fix Scala problem. 2. High Level Consumer API changes between Kafka 0.7 and 0.8, so you have to modify KafkaInputDStream in Spark Streaming. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 1:19 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: Setting Kafka parameters in Spark Streaming Thanks, Shao, for providing the necessary information. Hemanth On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Hemanth, I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field. Thanks Jerry From: Hemanth Yamijala [mailto:yhema...@gmail.commailto:yhema...@gmail.com] Sent: Tuesday, September 09, 2014 12:49 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Setting Kafka parameters in Spark Streaming Hi, I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following: kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel) I tried to call this as so: context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()) However, this is causing an exception like: java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158) at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154) Can anyone provide help on how to set these parameters ? Thanks Hemanth