Re: Matrix multiplication in spark
@sowen.. i am looking for distributed operations, especially very large sparse matrix x sparse matrix multiplication. what is the best way to implement this in spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-tp12562p18164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL COUNT DISTINCT
Here is the link on jira: https://issues.apache.org/jira/browse/SPARK-4243 https://issues.apache.org/jira/browse/SPARK-4243 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-COUNT-DISTINCT-tp17818p18166.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming window operations not producing output
hi TD, I would like to run streaming 24/7 and trying to use get or create but its not working please can you help on this http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-getOrCreate-tc18060.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-window-operations-not-producing-output-tp17504p18169.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparse x sparse matrix multiplication
local matrix-matrix multiplication or distributed? On Tue, Nov 4, 2014 at 11:58 PM, ll duy.huynh@gmail.com wrote: what is the best way to implement a sparse x sparse matrix multiplication with spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sparse-x-sparse-matrix-multiplication-tp18163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Matrix multiplication in spark
We are working on distributed block matrices. The main JIRA is at: https://issues.apache.org/jira/browse/SPARK-3434 The goal is to support basic distributed linear algebra, (dense first and then sparse). -Xiangrui On Wed, Nov 5, 2014 at 12:23 AM, ll duy.huynh@gmail.com wrote: @sowen.. i am looking for distributed operations, especially very large sparse matrix x sparse matrix multiplication. what is the best way to implement this in spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-tp12562p18164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.NotSerializableException: org.apache.spark.SparkEnv
Hi Thanks for replying, I have posted my code in http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-getOrCreate-tc18060.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-org-apache-spark-SparkEnv-tp10641p18172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
add support for separate GC log files for different executor
Hey, guys. Here's my problem: While using the standalone mode, I always use the following args for executor: -XX:+PrintGCDetails -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/tmp/spark.executor.gc.log But as we know, hotspot JVM does not support variable substitution on -Xloggc parameter, which will cause gc log be overwritten by other later executors. May I create a new path, which will add variable substitution before worker forks a new executor to avoid GC log overwriteen? First thoughts: configure the executor jvm args like this: -XX:+PrintGCDetails -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/tmp/spark.executor.%applicationId%.gc.log and this will replace the %applicationId% with the current application ID and pass the final args into java command line We can support more variables such as executorId Thanks. -- haitao.yao
Re: sparse x sparse matrix multiplication
distributed. something like CordinatedMatrix.multiply(CoordinatedMatrix). thanks xiangrui! On Wed, Nov 5, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: local matrix-matrix multiplication or distributed? On Tue, Nov 4, 2014 at 11:58 PM, ll duy.huynh@gmail.com wrote: what is the best way to implement a sparse x sparse matrix multiplication with spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sparse-x-sparse-matrix-multiplication-tp18163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Matrix multiplication in spark
ok great. when will this be ready? On Wed, Nov 5, 2014 at 4:27 AM, Xiangrui Meng men...@gmail.com wrote: We are working on distributed block matrices. The main JIRA is at: https://issues.apache.org/jira/browse/SPARK-3434 The goal is to support basic distributed linear algebra, (dense first and then sparse). -Xiangrui On Wed, Nov 5, 2014 at 12:23 AM, ll duy.huynh@gmail.com wrote: @sowen.. i am looking for distributed operations, especially very large sparse matrix x sparse matrix multiplication. what is the best way to implement this in spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-tp12562p18164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparse x sparse matrix multiplication
in case, this won't be available anytime soon with spark. what would be a good way to implement this multiplication feature in spark? On Wed, Nov 5, 2014 at 4:59 AM, Duy Huynh duy.huynh@gmail.com wrote: distributed. something like CordinatedMatrix.multiply(CoordinatedMatrix). thanks xiangrui! On Wed, Nov 5, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: local matrix-matrix multiplication or distributed? On Tue, Nov 4, 2014 at 11:58 PM, ll duy.huynh@gmail.com wrote: what is the best way to implement a sparse x sparse matrix multiplication with spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sparse-x-sparse-matrix-multiplication-tp18163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dynamically InferSchema From Hive and Create parquet file
Currently the createParquetMethod needs BeanClass as one of the parameters. javahiveContext.createParquetFile(XBean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); Is it possible that we dynamically Infer Schema From Hive using hive context and the table name, then give that Schema ? Regards. Madhu Jahagirdar The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
Change in the API for streamingcontext.actorStream?
Has there been a change in Creating an input stream with an actor receiver? I was able to get it working with v1.0.1 but not with any other version after that. I tried doing so with EchoActor and get serialization errors. I have also reported an issue about this SPARK-4171 https://issues.apache.org/jira/browse/SPARK-4171 Can someone please guide me whats wrong with my code? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Change-in-the-API-for-streamingcontext-actorStream-tp18179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pass unique ID to mllib algorithms pyspark
Hi Xiangrui, Thanks for the reply. is this still due to be released in 1.2 (SPARK-3530 is still open)? Thanks, On Wed, Nov 5, 2014 at 3:21 AM, Xiangrui Meng men...@gmail.com wrote: The proposed new set of APIs (SPARK-3573, SPARK-3530) will address this issue. We carry over extra columns with training and prediction and then leverage on Spark SQL's execution plan optimization to decide which columns are really needed. For the current set of APIs, we can add `predictOnValues` to models, which carries over the input keys. StreamingKMeans and StreamingLinearRegression implement this method. -Xiangrui On Tue, Nov 4, 2014 at 2:30 AM, jamborta jambo...@gmail.com wrote: Hi all, There are a few algorithms in pyspark where the prediction part is implemented in scala (e.g. ALS, decision trees) where it is not very easy to manipulate the prediction methods. I think it is a very common scenario that the user would like to generate prediction for a datasets, so that each predicted value is identifiable (e.g. have a unique id attached to it). this is not possible in the current implementation as predict functions take a feature vector and return the predicted values where, I believe, the order is not guaranteed, so there is no way to join it back with the original data the predictions are generated from. Is there a way around this at the moment? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pass-unique-ID-to-mllib-algorithms-pyspark-tp18051.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
using LogisticRegressionWithSGD.train in Python crashes with Broken pipe
I have a dataset comprised of ~200k labeled points whose features are SparseVectors with ~20M features. I take 5% of the data for a training set. model = LogisticRegressionWithSGD.train(training_set) fails with ERROR:py4j.java_gateway:Error while sending or receiving. Traceback (most recent call last): File /cluster/home/roskarr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 472, in send_command self.socket.sendall(command.encode('utf-8')) File /cluster/home/roskarr/miniconda/lib/python2.7/socket.py, line 224, in meth return getattr(self._sock,name)(*args) error: [Errno 32] Broken pipe I'm at a loss as to where to begin to debug this... any suggestions? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Standalone Specify mem / cores defaults
Hi, The docs specify that we can control the amount of ram / cores available via: -c CORES, --cores CORESTotal CPU cores to allow Spark applications to use on the machine (default: all available); only on worker-m MEM, --memory MEMTotal amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine's total RAM minus 1 GB); only on worker Ommitting these values would cause them to take on defaults. Is there a way of specifying the default? Or is the only way for it to take on default values is to ommit the parameters? Will -c default and -m default work? Thanks, Ashic.
Re: Standalone Specify mem / cores defaults
You can set those inside the spark-defaults.conf file under the conf directory inside your spark installation. Thanks Best Regards On Wed, Nov 5, 2014 at 4:51 PM, Ashic Mahtab as...@live.com wrote: Hi, The docs specify that we can control the amount of ram / cores available via: -c CORES, --cores CORESTotal CPU cores to allow Spark applications to use on the machine (default: all available); only on worker-m MEM, --memory MEMTotal amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine's total RAM minus 1 GB); only on worker Ommitting these values would cause them to take on defaults. Is there a way of specifying the default? Or is the only way for it to take on default values is to ommit the parameters? Will -c default and -m default work? Thanks, Ashic.
Unsubscribe
Thanks Regards, Mrugen Deshmukh. (M.S. Software Engineering - San Jose State University) [image: http://www.linkedin.com/in/mrugendeshmukh] http://www.linkedin.com/in/mrugendeshmukh
why decision trees do binary split?
Hi, Just wondering what is the reason that the decision tree implementation in spark always does binary splits? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-decision-trees-do-binary-split-tp18188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: I want to make clear the difference about executor-cores number.
If you go to your spark job UI (probably on http://master-node:4040), and click on the environment tab, you can check if the setting are correctly picked up by spark. Also when you run the job, you can see the subtasks (stages tab), inside the task you can check what resources are assigned to the task. If you are not getting 4 cores assigned (where appropriate), it means something is wrong with your config. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/I-want-to-make-clear-the-difference-about-executor-cores-number-tp18183p18189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using LogisticRegressionWithSGD.train in Python crashes with Broken pipe
Hi Rok, you could try to debug it by first collecting your training_set, see if it gets you something back, before passing it to the train method. Then go through each line in the train method, also the serializer and check where it fails exactly. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unsubscribe
To unsubscribe, send an email to user-unsubscr...@spark.apache.org Read more over here https://spark.apache.org/community.html. Thanks Best Regards On Wed, Nov 5, 2014 at 6:03 PM, mrugen deshmukh mrugenm...@gmail.com wrote: Thanks Regards, Mrugen Deshmukh. (M.S. Software Engineering - San Jose State University) [image: http://www.linkedin.com/in/mrugendeshmukh] http://www.linkedin.com/in/mrugendeshmukh
Re: Streaming window operations not producing output
Nothing on log4j logs. I figured it out by comparing my code to the examples. On Wed, Nov 5, 2014 at 4:17 AM, sivarani whitefeathers...@gmail.com wrote: hi TD, I would like to run streaming 24/7 and trying to use get or create but its not working please can you help on this http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-getOrCreate-tc18060.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-window-operations-not-producing-output-tp17504p18169.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn probably trying to load all the data to RAM
I have tried it out to merge the file to one, Spark is now working with RAM as I've expected. Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster? I am running job from the following command: ./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip /home/hadoop/preprocessor.py I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node. Thank you in advance for any advice, Best regards, Jan __ This is a crazy cases that has a few millions of files, the scheduler will run out of memory. Be default, each file will become a partition, so you will have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce the number of tasks, but can not reduce the number of partitions of original RDD. Could you pack the small files int bigger ones? Spark works much better than small files. On Mon, Nov 3, 2014 at 11:46 AM, jan.zi...@centrum.cz wrote: I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to
Re: Spark Streaming getOrCreate
Siarani, does your spark-master look like it's still up (i.e. if you check the UI?). I cannot tell if you see this error on get or initial create. You can start debugging by dumping out the value of master in setMaster(master) -- especially if this failure is from the intial startup From the error it appears that your application is unable to (re?)connect to the master upon checkpoint restart -- I wonder if the reason it needs to restart is that the master went down... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-getOrCreate-tp18060p18194.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn probably trying to load all the data to RAM
Ok so the problem was solved, it that the file was gziped and it looks that Spark does not support direct .gz file distribution to workers. Thank you very much fro the suggestion to merge the files. Best regards, Jan __ I have tried it out to merge the file to one, Spark is now working with RAM as I've expected. Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster? I am running job from the following command: ./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip /home/hadoop/preprocessor.py I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node. Thank you in advance for any advice, Best regards, Jan __ This is a crazy cases that has a few millions of files, the scheduler will run out of memory. Be default, each file will become a partition, so you will have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce the number of tasks, but can not reduce the number of partitions of original RDD. Could you pack the small files int bigger ones? Spark works much better than small files. On Mon, Nov 3, 2014 at 11:46 AM, jan.zi...@centrum.cz wrote: I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to
Re: using LogisticRegressionWithSGD.train in Python crashes with Broken pipe
yes, the training set is fine, I've verified it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18195.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR UserGroupInformation: PriviledgedActionException
I am running the same version of spark in the server (master + worker) and in the client / driver. For the server I am using the binaries spark-1.1.0-bin-hadoop1 And in the client I am using the same version: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-twitter_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-examples_2.10/artifactId version1.1.0/version /dependency On Wed, Nov 5, 2014 at 6:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Its more like you are having different versions of spark Thanks Best Regards On Wed, Nov 5, 2014 at 3:05 AM, Saiph Kappa saiph.ka...@gmail.com wrote: I set the host and port of the driver and now the error slightly changed Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/11/04 21:13:48 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 14/11/04 21:13:48 INFO SecurityManager: Changing view acls to: myuser,Myuser 14/11/04 21:13:48 INFO SecurityManager: Changing modify acls to: myuser,Myuser 14/11/04 21:13:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(myuser, Myuser); users with modify permissions: Set(myuser, Myuser) 14/11/04 21:13:48 INFO Slf4jLogger: Slf4jLogger started 14/11/04 21:13:48 INFO Remoting: Starting remoting 14/11/04 21:13:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@myserver:37456] 14/11/04 21:13:49 INFO Remoting: Remoting now listens on addresses: [akka.tcp://driverPropsFetcher@myserver:37456] 14/11/04 21:13:49 INFO Utils: Successfully started service 'driverPropsFetcher' on port 37456. 14/11/04 21:14:19 ERROR UserGroupInformation: PriviledgedActionException as:Myuser cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:113) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:156) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:125) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) ... 7 more Any ideas? Thanks. On Tue, Nov 4, 2014 at 11:29 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If you want to run the spark application from a remote machine, then you have to at least set the following configurations properly. *spark.driver.host* - points to the ip/host from where you are submitting the job (make sure you are able to ping this from the cluster) *spark.driver.port* - set it to a port number which is accessible from the spark cluster. You can look at more configuration options over here. http://spark.apache.org/docs/latest/configuration.html#networking Thanks Best Regards On Tue, Nov 4, 2014 at 6:07 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am trying to submit a job to a spark cluster running on a single machine (1 master + 1 worker) with hadoop 1.0.4. I submit it in the
Starting Spark Master on CDH5.2/Spark v1.1.0 fails. Indication is: 'SCALA_HOME is not set'
Hello Friends: I was temporarily using a manual build of Spark v1.1.0, until Cloudera CDH5 RPMs were updated to that latest version. So now I'm back to using the CDH5.2 Spark v1.1.0 distribution. That was just a preamble note for completeness. :) Now when I go to start the master as follows, it doesn't start, and the log file indicates that SCALA_HOME is not set. Here: == root@vps00# echo $SPARK_HOME /usr/lib/spark root@vps00# echo $SCALA_HOME /home/user/APPS.d/SCALA.d/latest -- this of course exists. root@vps00# service spark-master start Starting Spark master (spark-master): [ OK ] root@vps00# jps 1684 Jps root@vps00# cat /var/log/spark/spark-master.out SCALA_HOME is not set = I tried both variations of '/etc/spark/conf/spark-env.sh' to set SCALA_HOME (meaning, near the bottom of that file where it says uncomment this and comment that); as well as simply hard-coding 'export SCALA_HOME=/home/user/APPS.d/SCALA.d/latest' in that file's second to last line. Same error. SIDE NOTE: Although this is not the correct way to do it (because things don't get set up properly and will cause other problems), this works to start it: root@vps00# /etc/init.d/spark-master start --- old school way. root@vps00# jps 2085 Jps 2013 Master But again, doing it that way will miss sourcing some configuration files and lead to other issues. Does anyone know what might be wrong? Thank you nmv -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Starting-Spark-Master-on-CDH5-2-Spark-v1-1-0-fails-Indication-is-SCALA-HOME-is-not-set-tp18198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Snappy and spark 1.1
Hi Guys, As part of debugging this native library error in our environment, it would be great if somebody can help me with this question. What kind of temp, scratch, and staging directories does Spark need and use on the slave nodes in the YARN cluster mode? Thanks, Aravind On Mon, Nov 3, 2014 at 4:11 PM, Aravind Srinivasan arav...@altiscale.com wrote: Team, We are running a build of spark 1.1.1 for hadoop 2.2. We can't get the code to read LZO or snappy files in YARN. It fails to find the native libs. I have tried many different ways of defining the lib path - LD_LIBRARY_PATH, --driver-class-path, spark.executor.extraLibraryPath in spark-defaults.conf, --driver-java-options, and SPARK_LIBRARY_PATH. But none of them seem to take effect. What am I missing? Or is this a known issue? The example below (HdfsTest) works with plain text on both cluster and local mode. LZO and snappy files work on local mode, but both fail in the YARN cluster mode LD_LIBRARY_PATH=/opt/hadoop/lib/native/ MASTER=yarn SPARK_EXAMPLES_JAR=./examples/target/spark-examples_2.10-1.1.1.jar ./bin/run-example HdfsTest /user/input/part-r-0.snappy Stack Trace: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 101-26-03.sc1.verticloud.com): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Aravind
Re: Spark on Yarn probably trying to load all the data to RAM
Could you please give me an example or send me a link of how to use Hadoop CombinedFileInputFormat? It sound very interesting to me and it would probably save me several hours of my pipeline computation. Merging of the files is currently the bottleneck in my system. __ Another potential option could be to use Hadoop CombinedFileInputFormat with input split size of say 512 MB or 1 GB. That way you don't need to have a preceding step and I/O of first combining the files together. On Nov 5, 2014 8:23 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Ok so the problem was solved, it that the file was gziped and it looks that Spark does not support direct .gz file distribution to workers. Thank you very much fro the suggestion to merge the files. Best regards, Jan __ I have tried it out to merge the file to one, Spark is now working with RAM as I've expected. Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster? I am running job from the following command: ./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip /home/hadoop/preprocessor.py I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node. Thank you in advance for any advice, Best regards, Jan __ This is a crazy cases that has a few millions of files, the scheduler will run out of memory. Be default, each file will become a partition, so you will have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce the number of tasks, but can not reduce the number of partitions of original RDD. Could you pack the small files int bigger ones? Spark works much better than small files. On Mon, Nov 3, 2014 at 11:46 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver]
Understanding spark operation pipeline and block storage
Hi, I would like to understand the pipeline of spark's operation(transformation and action) and some details on block storage. Let's consider the following code: val rdd1 = SparkContext.textFile(hdfs://...) rdd1.map(func1).map(func2).count For example, we have a file in hdfs about 80Gb, already split in 32 files, each 2.5Gb. q1) How many partitions will rdd1 have ? rule 1) Maybe 32, since there are 32 split files ? Because, most of the case, this rule is true if the file is not big in size. rule 2) Maybe more, I am not sure whether spark's block store can contain a 2.5Gb partition. Is there some parameter specify the block store size ? AFAIK, hdfs block size is used to read data from hdfs by spark. So there will be (80Gb/hdfsBlockSize) partitions in rdd1, right ? Usually, the hdfs block size is 64Mb, then we will have 80g / 64m = 1280 partitions ? Too many ? Which criterion will it take ? the number of split files or hdfs block size. q2) Here, func1 and func2 are sequentially added into DAG. What's the workflow on the partition level ? option1: Given a partition, func1 and func2 will be applied to each element in this partition sequentially. After everything is done, we count the # of line in the partition and send count result to drive. Then, we take the next partition and do the same thing? option2: Or else, we apply func1 to all the partitions first, then apply func2 to all partitions which have applied func1, count # of line in each partition and send result to driver ? I have do some tests, it seems that option1 is correct. Can anyone confirm this ? So in option 1, we have 1 job count which contains 3 stages: map(func1), map(func2), count. q3) What if we run out of memory ? Suppose we have 12 cores, 15Gb memory in cluster. Case1 : For example, the func1 will take one line in file, and create an big object for each line, then the partition applied func1 will become a large partition. If we have 12 cores in clusters, that means we may have 12 large partitions in memory. What if these partitions are much bigger than memory ? What will happen ? an exception OOM / heap size, etc ? Case2 : Suppose the input is 80 GB, but we force RDD to be repartitioned into 6 partitions which is small than the number of core. Normally, each partition will be send to a core, then all the input will be in memory. However, we have 15G memory in Cluster. What will happen ? OOM Exception ? Then, could we just split the RDD into more partitions so that 80GB / #partition *12(which is # of cores) 15Gb(memory size) ? Meanwhile, we can not split too many, which leads to some overhead on task distribution. If we read data from hdfs using hdfs block size 64MB as partition size, we will have a formula like: 64Mb * # of cores Memory which in most case is true. Could this explain why we reading hdfs using block size will not leads to OOM like case 2, even if the data is very big in size. Sorry for making this post a bit long. Hope I make myself clear. Any help on any question will be appreciated. Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Any limitations of spark.shuffle.spill?
Hi One question about the power of spark.shuffle.spill - (I know this has been asked several times :-) Basically, in handling a (cached) dataset that doesn't fit in memory, Spark can spill it to disk. However, can I say that, when this is enabled, Spark can handle the situation faultlessly, no matter - (1)How big the data set is (as compared to the available memory) (2)How complex the detailed calculation is being carried out Can spark.shuffle.spill handle this perfectly? Here we assume that (1) the disk space has no limitations and (2) the code is correctly written according to the functional requirements. The reason to ask this is, under such situations, I kept receiving warnings like FetchFailed, if memory usage reaches the limit. Thanks YC
Re: sparse x sparse matrix multiplication
You can use breeze for local sparse-sparse matrix multiplication and then define an RDD of sub-matrices RDD[(Int, Int, CSCMatrix[Double])] (blockRowId, blockColId, sub-matrix) and then use join and aggregateByKey to implement this feature, which is the same as in MapReduce. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using LogisticRegressionWithSGD.train in Python crashes with Broken pipe
Which Spark version did you use? Could you check the WebUI and attach the error message on executors? -Xiangrui On Wed, Nov 5, 2014 at 8:23 AM, rok rokros...@gmail.com wrote: yes, the training set is fine, I've verified it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18195.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Consumer in Spark Streaming
As suggested by Qiaou, looked at the UI: 1) Under 'Stages' the only 'active' stage is: runJob at ReceiverTracker.scala:275 2) Under 'Executors', there's only 1 active task, but I don't see any output (or logs) 3) Under 'Streaming', there's one receiver called, 'KafkaReciever-0', but 'Records in last batch' are 0. Honestly, I think it's not connecting to my Kafka topic - possibly because I need to pass the following parameter: metadata.broker.list - machine:9092 But I don't know how to pass this to KafkaUtils.createStream(...). Could that be the problem? On Tue, Nov 4, 2014 at 11:12 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Your code doesn't trigger any action. How about the following? JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(60 * 1 * 1000)); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, machine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { System.out.println(status); return status; } } ); statuses.print() ; Or you could use foreachRDD instead of map() if your intention is just printing. Thanks Best Regards On Wed, Nov 5, 2014 at 12:35 PM, Something Something mailinglist...@gmail.com wrote: It's not local. My spark url is something like this: String sparkUrl = spark://host name:7077; On Tue, Nov 4, 2014 at 11:03 PM, Jain Rahul ja...@ivycomptech.com wrote: I think you are running it locally. Do you have local[1] here for master url? If yes change it to local[2] or more number of threads. It may be due to topic name mismatch also. sparkConf.setMaster(“local[1]); Regards, Rahul From: Something Something mailinglist...@gmail.com Date: Wednesday, November 5, 2014 at 12:23 PM To: Shao, Saisai saisai.s...@intel.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Kafka Consumer in Spark Streaming Added foreach as follows. Still don't see any output on my console. Would this go to the worker logs as Jerry indicated? JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, mymachine:2181, 1, map); JavaDStreamString statuses = tweets.map( new FunctionString, String() { public String call(String status) { return status; } } ); statuses.foreach(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { for (String str: stringJavaRDD.take(10)) { System.out.println(Message: + str); } return null; } }); On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai saisai.s...@intel.com wrote: If you’re running on a standalone mode, the log is under SPAR_HOME/work/ directory. I’m not sure for yarn or mesos, you can check the document of Spark to see the details. Thanks Jerry *From:* Something Something [mailto:mailinglist...@gmail.com] *Sent:* Wednesday, November 05, 2014 2:28 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org *Subject:* Re: Kafka Consumer in Spark Streaming The Kafka broker definitely has messages coming in. But your #2 point is valid. Needless to say I am a newbie to Spark. I can't figure out where the 'executor' logs would be. How would I find them? All I see printed on my screen is this: 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started 14/11/04 22:21:23 INFO Remoting: Starting remoting 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@mymachie:60743] 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@mymachine:60743] 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to shell based --- Time: 141516852 ms --- --- Time: 141516852 ms --- Keeps repeating this... On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi, would you mind describing your problem a little more specific. 1. Is the Kafka broker currently has no data feed in? 2. This code will print the lines, but not in the driver side, the code is running in the executor side, so you can check the log in worker dir to see if there’s any printing logs under this folder. 3. Did you see any exceptions when running the app, this will
Re: Dynamically InferSchema From Hive and Create parquet file
That method is for creating a new directory to hold parquet data when there is no hive metastore available, thus you have to specify the schema. If you've already created the table in the metastore you can just query it using the sql method: javahiveConxted.sql(SELECT * FROM parquetTable); You can also load the data as a SchemaRDD without using the metastore since parquet is self describing: javahiveContext.parquetFile(.../path/to/parquetFiles).registerTempTable(parquetData) On Wed, Nov 5, 2014 at 2:15 AM, Jahagirdar, Madhu madhu.jahagir...@philips.com wrote: Currently the createParquetMethod needs BeanClass as one of the parameters. javahiveContext.createParquetFile(XBean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); Is it possible that we dynamically Infer Schema From Hive using hive context and the table name, then give that Schema ? Regards. Madhu Jahagirdar -- The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
Re: Matrix multiplication in spark
We are working on a PRs to add block partitioned matrix formats and dense matrix multiply methods. This should be out in the next few weeks or so. The sparse methods still need some research on partitioning schemes etc. and we will do that after the dense methods are in place. Thanks Shivaram On Wed, Nov 5, 2014 at 2:00 AM, Duy Huynh duy.huynh@gmail.com wrote: ok great. when will this be ready? On Wed, Nov 5, 2014 at 4:27 AM, Xiangrui Meng men...@gmail.com wrote: We are working on distributed block matrices. The main JIRA is at: https://issues.apache.org/jira/browse/SPARK-3434 The goal is to support basic distributed linear algebra, (dense first and then sparse). -Xiangrui On Wed, Nov 5, 2014 at 12:23 AM, ll duy.huynh@gmail.com wrote: @sowen.. i am looking for distributed operations, especially very large sparse matrix x sparse matrix multiplication. what is the best way to implement this in spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-tp12562p18164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Any Replicated RDD in Spark?
Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Any Replicated RDD in Spark?
And another similar case: If I have get a RDD from previous step, but for next step it should be a map side join (so I need to broadcast this RDD to every nodes). What is the best way for me to do that? Collect RDD in driver first and create broadcast? Or any shortcut in spark for this? Thanks! -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, November 05, 2014 3:32 PM To: 'Matei Zaharia' Cc: 'user@spark.apache.org' Subject: RE: Any Replicated RDD in Spark? Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Logging from the Spark shell
Dear Spark users, I would like to run a long experiment using spark-shell. How can I log my intermediate results (numbers, strings) into some file on a master node? What are the best practices? It is NOT performance metrics of Spark that I want to log every X seconds. Instead, I would like to log some data that I receive after each iteration of my algorithm. Thanks, Alexander
AVRO specific records
How can I read/write AVRO specific records? I found several snippets using generic records, but nothing with specific records so far. Thanks, Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Partition sorting by Spark framework
I need to sort my RDD partitions but the whole partition(s) might not fit into memory, so I cannot run the Collections Sort() method. Does Spark support partitions sorting by virtue of its framework? I am working on 1.1.0 version. I looked up similar unanswered question: /http://apache-spark-user-list.1001560.n3.nabble.com/sort-order-after-reduceByKey-groupByKey-td2959.html/ Thanks All!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-tp18213.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AVRO specific records
Hi Simone, Matt Massie put together a good tutorial on his blog. If you’re looking for more code using Avro, we use it pretty extensively in our genomics project. Our Avro schemas are here, and we have serialization code here. We use Parquet for storing the Avro records, but there is also an Avro HadoopInputFormat. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Nov 5, 2014, at 1:25 PM, Simone Franzini captainfr...@gmail.com wrote: How can I read/write AVRO specific records? I found several snippets using generic records, but nothing with specific records so far. Thanks, Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Re: AVRO specific records
Something like this works and is how I create an RDD of specific records. val avroRdd = sc.newAPIHadoopFile(twitter.avro, classOf[AvroKeyInputFormat[twitter_schema]], classOf[AvroKey[twitter_schema]], classOf[NullWritable], conf) (From https://github.com/julianpeeters/avro-scala-macro-annotation-examples/blob/master/spark/src/main/scala/AvroSparkScala.scala) Keep in mind you'll need to use the kryo serializer as well. From: Frank Austin Nothaft fnoth...@berkeley.edumailto:fnoth...@berkeley.edu Date: Wednesday, November 5, 2014 at 5:06 PM To: Simone Franzini captainfr...@gmail.commailto:captainfr...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: AVRO specific records Hi Simone, Matt Massie put together a good tutorial on his bloghttp://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. If you’re looking for more code using Avro, we use it pretty extensively in our genomics project. Our Avro schemas are herehttps://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl, and we have serialization code herehttps://github.com/bigdatagenomics/adam/tree/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization. We use Parquet for storing the Avro records, but there is also an Avro HadoopInputFormat. Regards, Frank Austin Nothaft fnoth...@berkeley.edumailto:fnoth...@berkeley.edu fnoth...@eecs.berkeley.edumailto:fnoth...@eecs.berkeley.edu 202-340-0466 On Nov 5, 2014, at 1:25 PM, Simone Franzini captainfr...@gmail.commailto:captainfr...@gmail.com wrote: How can I read/write AVRO specific records? I found several snippets using generic records, but nothing with specific records so far. Thanks, Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Question regarding sorting and grouping
Hi, I’m working on an use case using Spark streaming. I need to process a RDD of strings so that they will be grouped by IP and sorted by time. Could somebody tell me the right transformation? Input: 2014-10-23 08:18:38,904 [192.168.10.1] 2014-10-23 08:18:38,907 [192.168.10.1] ccc 2014-10-23 08:18:39,910 [192.168.102.1] 2014-10-23 08:18:38,934 [192.168.10.1] 2014-10-23 08:18:39,032 [192.168.102.1] 2014-10-23 08:18:38,149 [192.168.10.1] 2014-10-23 08:18:39,582 [192.168.102.1] 2014-10-23 08:18:38,691 [192.168.10.1] Expected result: Array(192.168.10.1, ArrayBuffer( 2014-10-23 08:18:38,149 [192.168.10.1] , 2014-10-23 08:18:38,904 [192.168.10.1] , 2014-10-23 08:18:38,907 [192.168.10.1] ccc, 2014-10-23 08:18:38,691 [192.168.10.1] , 2014-10-23 08:18:38,934 [192.168.10.1] )) (192.168.102.1, ArrayBuffer( 2014-10-23 08:18:39,032 [192.168.102.1] , 2014-10-23 08:18:39,582 [192.168.102.1] , 2014-10-23 08:18:39,910 [192.168.102.1] )) Thanks Ping
cache function is not working on RDD from parallelize
Hi, On a 5 node cluster, say I have data on the driver application node, and then I call parallelize on the data, I get a rdd back. However, when I call cache on the rdd the rdd won't be cached (I checked that through timing on count the realized-cached rdd, take as long as before it was realized). So does anyone have any idea on this? Thanks Edwin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cache-function-is-not-working-on-RDD-from-parallelize-tp18219.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Configuring custom input format
I'm trying to use a custom input format with SparkContext.newAPIHadoopRDD. Creating the new RDD works fine but setting up the configuration file via the static methods on input formats that require a Hadoop Job object is proving to be difficult. Trying to new up my own Job object with the SparkContext.hadoopConfiguration is throwing the exception on line 283 of this grepcode: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job Looking in the SparkContext code, I'm seeing that it's newing up Job objects just fine using nothing but the configuraiton. Using SparkContext.textFile() appears to be working for me. Any ideas? Has anyone else run into this as well? Is it possible to have a method like SparkContext.getJob() or something similar? Thanks.
Re: Configuring custom input format
The closer I look @ the stack trace in the Scala shell, it appears to be the call to toString() that is causing the construction of the Job object to fail. Is there a ways to suppress this output since it appears to be hindering my ability to new up this object? On Wed, Nov 5, 2014 at 5:49 PM, Corey Nolet cjno...@gmail.com wrote: I'm trying to use a custom input format with SparkContext.newAPIHadoopRDD. Creating the new RDD works fine but setting up the configuration file via the static methods on input formats that require a Hadoop Job object is proving to be difficult. Trying to new up my own Job object with the SparkContext.hadoopConfiguration is throwing the exception on line 283 of this grepcode: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-mapreduce-client-core/2.5.0/org/apache/hadoop/mapreduce/Job.java#Job Looking in the SparkContext code, I'm seeing that it's newing up Job objects just fine using nothing but the configuraiton. Using SparkContext.textFile() appears to be working for me. Any ideas? Has anyone else run into this as well? Is it possible to have a method like SparkContext.getJob() or something similar? Thanks.
[SQL] PERCENTILE is not working
Hi all, I encounter this error when execute the query sqlContext.sql(select percentile(age, array(0, 0.5, 1)) from people).collect() java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to [Ljava.lang.Object; at org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83) at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259) at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349) at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170) at org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:135) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) Thanks, Kevin Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking the previous large-scale sort record with Spark
Hi all, We are excited to announce that the benchmark entry has been reviewed by the Sort Benchmark committee and Spark has officially won the Daytona GraySort contest in sorting 100TB of data. Our entry tied with a UCSD research team building high performance systems and we jointly set a new world record. This is an important milestone for the project, as it validates the amount of engineering work put into Spark by the community. As Matei said, For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Updated blog post: http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: with SparkStreeaming spark-submit, don't see output after ssc.start()
This problem turned out to be a cockpit error. I had the same class name defined in a couple different files, and didn't realize SBT was compiling them all together, and then executing the wrong one. Mea culpa. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/with-SparkStreeaming-spark-submit-don-t-see-output-after-ssc-start-tp17989p18224.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any Replicated RDD in Spark?
If you start with an RDD, you do have to collect to the driver and broadcast to do this. Between the two options you listed, I think this one is simpler to implement, and there won't be a huge difference in performance, so you can go for it. Opening InputStreams to a distributed file system by hand can be a lot of code. Matei On Nov 5, 2014, at 12:37 PM, Shuai Zheng szheng.c...@gmail.com wrote: And another similar case: If I have get a RDD from previous step, but for next step it should be a map side join (so I need to broadcast this RDD to every nodes). What is the best way for me to do that? Collect RDD in driver first and create broadcast? Or any shortcut in spark for this? Thanks! -Original Message- From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, November 05, 2014 3:32 PM To: 'Matei Zaharia' Cc: 'user@spark.apache.org' Subject: RE: Any Replicated RDD in Spark? Nice. Then I have another question, if I have a file (or a set of files: part-0, part-1, might be a few hundreds MB csv to 1-2 GB, created by other program), need to create hashtable from it, later broadcast it to each node to allow query (map side join). I have two options to do it: 1, I can just load the file in a general code (open a inputstream, etc), parse content and then create the broadcast from it. 2, I also can use a standard way to create the RDD from these file, run the map to parse it, then collect it as map, wrap the result as broadcast to push to all nodes again. I think the option 2 might be more consistent with spark's concept (and less code?)? But how about the performance? The gain is can parallel load and parse the data, penalty is after load we need to collect and broadcast result again? Please share your opinion. I am not sure what is the best practice here (in theory, either way works, but in real world, which one is better?). Regards, Shuai -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Monday, November 03, 2014 4:15 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: Re: Any Replicated RDD in Spark? You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to blend a DStream and a broadcast variable?
My use case has one large data stream (DS1) that obviously maps to a DStream. The processing of DS1 involves filtering it for any of a set of known values, which will change over time, though slowly by streaming standards. If the filter data were static, it seems to obviously map to a broadcast variable, but it's dynamic. (And I don't think it works to implement it as a DStream, because the new values need to be copied redundantly to all executors, not partitioned among the executors.) Looking at the Spark and Spark Streaming documentation, I have two questions: 1) There's no mention in the Spark Streaming Programming Guide of broadcast variables. Do they coexist properly? 2) Once I have a broadcast variable in place in the periodic function that Spark Streaming executes, how can I update its value? Obviously I can't literally update the value of that broadcast variable, which is immutable, but how can I get a new version of the variable established in all the executors? (And the other ever-present implicit question...) 3) Is there a better way to implement this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-blend-a-DStream-and-a-broadcast-variable-tp18227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AVRO specific records
You can also use the Kite SDK to read/write Avro records: https://github.com/kite-sdk/kite-examples/tree/master/spark - Anand On Wed, Nov 5, 2014 at 2:24 PM, Laird, Benjamin benjamin.la...@capitalone.com wrote: Something like this works and is how I create an RDD of specific records. val avroRdd = sc.newAPIHadoopFile(twitter.avro, classOf[AvroKeyInputFormat[twitter_schema]], classOf[AvroKey[twitter_schema]], classOf[NullWritable], conf) (From https://github.com/julianpeeters/avro-scala-macro-annotation-examples/blob/master/spark/src/main/scala/AvroSparkScala.scala) Keep in mind you'll need to use the kryo serializer as well. From: Frank Austin Nothaft fnoth...@berkeley.edu Date: Wednesday, November 5, 2014 at 5:06 PM To: Simone Franzini captainfr...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: AVRO specific records Hi Simone, Matt Massie put together a good tutorial on his blog http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/. If you’re looking for more code using Avro, we use it pretty extensively in our genomics project. Our Avro schemas are here https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl, and we have serialization code here https://github.com/bigdatagenomics/adam/tree/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization. We use Parquet for storing the Avro records, but there is also an Avro HadoopInputFormat. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Nov 5, 2014, at 1:25 PM, Simone Franzini captainfr...@gmail.com wrote: How can I read/write AVRO specific records? I found several snippets using generic records, but nothing with specific records so far. Thanks, Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
How to trace/debug serialization?
In my spark job, I have a loop something like this: bla.forEachRdd(rdd = { //init some vars rdd.forEachPartition(partiton = { //init some vars partition.foreach(kv = { ... I am seeing serialization errors (unread block data), because I think spark is trying to serialize the whole containing class. But I have been careful not to reference instance vars in the block. Is there a way to see exactly what class is failing serialization, and maybe how spark decided it needs to be serialized? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-trace-debug-serialization-tp18230.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking the previous large-scale sort record with Spark
Congrats to everyone who helped make this happen. And if anyone has even more machines they'd like us to run on next year, let us know :). Matei On Nov 5, 2014, at 3:11 PM, Reynold Xin r...@databricks.com wrote: Hi all, We are excited to announce that the benchmark entry has been reviewed by the Sort Benchmark committee and Spark has officially won the Daytona GraySort contest in sorting 100TB of data. Our entry tied with a UCSD research team building high performance systems and we jointly set a new world record. This is an important milestone for the project, as it validates the amount of engineering work put into Spark by the community. As Matei said, For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Updated blog post: http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi folks, I interrupt your regularly scheduled user / dev list to bring you some pretty cool news for the project, which is that we've been able to use Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x faster on 10x fewer nodes. There's a detailed writeup at http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html. Summary: while Hadoop MapReduce held last year's 100 TB world record by sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on 206 nodes; and we also scaled up to sort 1 PB in 234 minutes. I want to thank Reynold Xin for leading this effort over the past few weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for providing the machines to make this possible. Finally, this result would of course not be possible without the many many other contributions, testing and feature requests from throughout the community. For an engine to scale from these multi-hour petabyte batch jobs down to 100-millisecond streaming and interactive queries is quite uncommon, and it's thanks to all of you folks that we are able to make this happen. Matei - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get Spark User List Digests only, but still be able to post questions ...
Hello Friends: I cringe to ask this off-topic question (so forgive me in advance). I'm trying to figure out how to receive only the digest email for this Spark User List, yet still be able to email questions to it. Subscribing to the 'user-dig...@spark.incubator.apache.org' alias does provide that digest, but that does not appear to let me email questions to either: [user|user-digest]@spark.incubator.apache.org. Mine attempts seem to get rejected. Does that scenario work for anyone? Conversely, subscribing 'u...@spark.incubator.apache.org' does let me email questions of course, but it does not provide the digest experience I seek. Perhaps -- once subscribed to this one -- there is a particular email alias that I can email to toggle the digest/non-digest behavior? I looked at all of the aliases available (the ones I could find), but didn't see any that offered this. I've been a long running member and enjoy this Spark list, but I need to shrink my INBOX in general, and things like this are helpful. =:) Thank you for the pointers (again, sorry for the off-topic), nmv -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-Spark-User-List-Digests-only-but-still-be-able-to-post-questions-tp18232.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkContext._lock Error
I'm using this system Hadoop 1.0.4 Scala 2.9.3 Hive 0.9.0 With spark 1.1.0. When importing pyspark, I'm getting this error: from pyspark.sql import * Traceback (most recent call last): File stdin, line 1, in ? File /path/spark-1.1.0/python/pyspark/__init__.py, line 63, in ? from pyspark.context import SparkContext File /path/spark-1.1.0/python/pyspark/context.py, line 209 with SparkContext._lock: ^ SyntaxError: invalid syntax How do I fix it? Thank you,
Re: SparkContext._lock Error
What's the version of Python? 2.4? Davies On Wed, Nov 5, 2014 at 4:21 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I’m using this system Hadoop 1.0.4 Scala 2.9.3 Hive 0.9.0 With spark 1.1.0. When importing pyspark, I’m getting this error: from pyspark.sql import * Traceback (most recent call last): File stdin, line 1, in ? File /path/spark-1.1.0/python/pyspark/__init__.py, line 63, in ? from pyspark.context import SparkContext File /path/spark-1.1.0/python/pyspark/context.py, line 209 with SparkContext._lock: ^ SyntaxError: invalid syntax How do I fix it? Thank you, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Hive Version
Hi, all, I noticed that when compiling the SparkSQL with profile hive-0.13.1, it will fetch the Hive version of 0.13.1a under groupId org.spark-project.hive, what's the difference with the one of org.apache.hive? And where can I get the source code for re-compiling? Thanks, Cheng Hao
RE: [SQL] PERCENTILE is not working
Which version are you using? I can reproduce that in the latest code, but with different exception. I've filed an bug https://issues.apache.org/jira/browse/SPARK-4263, can you also add some information there? Thanks, Cheng Hao -Original Message- From: Kevin Paul [mailto:kevinpaulap...@gmail.com] Sent: Thursday, November 6, 2014 7:09 AM To: user Subject: [SQL] PERCENTILE is not working Hi all, I encounter this error when execute the query sqlContext.sql(select percentile(age, array(0, 0.5, 1)) from people).collect() java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to [Ljava.lang.Object; at org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83) at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259) at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349) at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170) at org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:135) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) Thanks, Kevin Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
log4j logging control via sbt
I've tried to set the log4j logger to warn only via log4j properties file in cat src/test/resources/log4j.properties log4j.logger.org.apache.spark=WARN or in sbt via javaOptions += -Dlog4j.logger.org.apache.spark=WARN But the logger still gives me INFO messages to stdout when I run my tests via sbt test Is it the wrong option? I also tried javaOptions += -Dlog4j.rootLogger=warn but that doesn't seem to help either. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Errors in Spark streaming application due to HDFS append
Hi All, I’m trying to write streaming processed data in HDFS (Hadoop 2). The buffer is flushed and closed after each writing. The following errors occurred when opening the same file to append. I know for sure the error is caused by closing the file. Any idea? Here is the code to write HDFS def appendToFile(id: String, text: String): Unit = { println(Request to write + text.getBytes().length + bytes, MAX_BUF_SIZE: + LogConstants.MAX_BUF_SIZE) println(+++ Write to file id = + id) if (bufMap == null) { init } var fsout: FSDataOutputStream = null val filename = LogConstants.FILE_PATH + id try { fsout = getFSDOS(id, filename) println(Write + text.getBytes().length + of bytes in Text to [ + filename + ]) fsout.writeBytes(text) fsout.flush() //fsout.sync() //} catch { // case e: InterruptedException = } finally { if (fsout != null) fsout.close() } } Here are the errors observed: +++ Write to file id = 0 Wrote 129820 bytes +++ Write to file id = 0 14/11/05 18:01:35 ERROR Executor: Exception in task ID 998 14/11/05 18:01:35 ERROR Executor: Exception in task ID 998 org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0 at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:467) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:5969) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:5932) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updatePipeline(NameNodeRpcServer.java:651) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updatePipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:889) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1986) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1982) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1980) at org.apache.hadoop.ipc.Client.call(Client.java:1347) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy11.updatePipeline(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy11.updatePipeline(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.updatePipeline(ClientNamenodeProtocolTranslatorPB.java:791) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1047) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:520) 14/11/05 18:01:36 ERROR TaskSetManager: Task 53.0:7 failed 1 times; aborting job 14/11/05 18:01:36 ERROR JobScheduler: Error running job streaming job 1415239295000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 53.0:7 failed 1 times, most recent failure: Exception failure in TID 998 on host localhost: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException): 0 at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:467) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:5969) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:5932) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updatePipeline(NameNodeRpcServer.java:651) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updatePipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:889) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at
Re: How to avoid use snappy compression when saveAsSequenceFile?
On Mon, Oct 27, 2014 at 7:37 PM, buring qyqb...@gmail.com wrote: Here is error log,I abstract as follows: INFO [binaryTest---main]: before first WARN [org.apache.spark.scheduler.TaskSetManager---Result resolver thread-0]: Lost task 0.0 in stage 0.0 (TID 0, spark-dev136): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236) org.xerial.snappy.Snappy.clinit(Snappy.java:48) Which OS are you running? The snappy version shipped with Spark has some issues with older OSes (like CentOS 5; I think SLES11 has the same problem). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Task size variation while using Range Vs List
I noticed a behaviour where it was observed that, if i'm using val temp = sc.parallelize ( 1 to 10) temp.collect Task size will be in bytes let's say ( 1120 bytes). But if i change this to a for loop import scala.collection.mutable.ArrayBuffer val data= new ArrayBuffer[Integer]() for(i - 1 to 100)data+=i val distData = sc.parallelize(data) distData.collect Here the task size is in MB's 5000120 bytes. Any inputs here would be appreciated, this is really confusing 1) Why does the data travel from Driver to Executor every time an Action is performed ( i thought the data exists in the Executor's memory, and only the code is pushed from driver to executor ) ?? 2) Why does Range not increase the task size, where as any other collection increases the size exponentially ?? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-size-variation-while-using-Range-Vs-List-tp18243.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to trace/debug serialization?
From what i've observed, there are no debug logs while serialization takes place. You can see the source code if you want, TaskSetManager class has some functions for serialization. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-trace-debug-serialization-tp18230p18244.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SQL] PERCENTILE is not working
Hello Kevin, https://issues.apache.org/jira/browse/SPARK-3891 will fix this bug. Thanks, Yin On Wed, Nov 5, 2014 at 8:06 PM, Cheng, Hao hao.ch...@intel.com wrote: Which version are you using? I can reproduce that in the latest code, but with different exception. I've filed an bug https://issues.apache.org/jira/browse/SPARK-4263, can you also add some information there? Thanks, Cheng Hao -Original Message- From: Kevin Paul [mailto:kevinpaulap...@gmail.com] Sent: Thursday, November 6, 2014 7:09 AM To: user Subject: [SQL] PERCENTILE is not working Hi all, I encounter this error when execute the query sqlContext.sql(select percentile(age, array(0, 0.5, 1)) from people).collect() java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to [Ljava.lang.Object; at org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83) at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259) at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349) at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170) at org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:135) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) Thanks, Kevin Paul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Hive Version
The original spark-project hive-0.13.1 has some problem with packaging causing version conflicts, and hive-0.13.1a is repackaged to solve the problem. They share the same official hive source code release 0.13.1, with unnecessary package removed from the original official hive release package. You can refer to https://github.com/apache/spark/pull/2685 for the whole story. Thanks. Zhan Zhang Thanks. Zhan Zhang On Nov 5, 2014, at 4:47 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi, all, I noticed that when compiling the SparkSQL with profile “hive-0.13.1”, it will fetch the Hive version of 0.13.1a under groupId “org.spark-project.hive”, what’s the difference with the one of “org.apache.hive”? And where can I get the source code for re-compiling? Thanks, Cheng Hao -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
JavaStreamingContextFactory checkpoint directory NotSerializableException
Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); Regards, Vasu
Re: Task size variation while using Range Vs List
For 2), If the input is Range, Spark only needs the start value and the end value for each partition, so the overhead of Range is little. But for ArrayBuffer, Spark needs to serialize all of the data into the task. That's why it's huge in your case. For 1), Spark does not always travel the data to Executor. It's only sent the task. If creating a RDD from HDFS files, it only sent the file metadata in the task. However, parallelize(ArrayBuffer) is an exception, it needs to send the data in ArrayBuffer by design. When you call an second action in the driver on the same RDD, if the data is not persisted, Spark needs to load the data again. You can call RDD.cache to persist the RDD in the memory. Best Regards, Shixiong Zhu 2014-11-06 11:35 GMT+08:00 nsareen nsar...@gmail.com: I noticed a behaviour where it was observed that, if i'm using val temp = sc.parallelize ( 1 to 10) temp.collect Task size will be in bytes let's say ( 1120 bytes). But if i change this to a for loop import scala.collection.mutable.ArrayBuffer val data= new ArrayBuffer[Integer]() for(i - 1 to 100)data+=i val distData = sc.parallelize(data) distData.collect Here the task size is in MB's 5000120 bytes. Any inputs here would be appreciated, this is really confusing 1) Why does the data travel from Driver to Executor every time an Action is performed ( i thought the data exists in the Executor's memory, and only the code is pushed from driver to executor ) ?? 2) Why does Range not increase the task size, where as any other collection increases the size exponentially ?? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-size-variation-while-using-Range-Vs-List-tp18243.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any limitations of spark.shuffle.spill?
Two limitations we found here: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-in-quot-cogroup-quot-td17349.html Best Regards, Shixiong Zhu 2014-11-06 2:04 GMT+08:00 Yangcheng Huang yangcheng.hu...@huawei.com: Hi One question about the power of spark.shuffle.spill – (I know this has been asked several times :-) Basically, in handling a (cached) dataset that doesn’t fit in memory, Spark can spill it to disk. However, can I say that, when this is enabled, Spark can handle the situation faultlessly, no matter – (1)How big the data set is (as compared to the available memory) (2)How complex the detailed calculation is being carried out Can spark.shuffle.spill handle this perfectly? Here we assume that (1) the disk space has no limitations and (2) the code is correctly written according to the functional requirements. The reason to ask this is, under such situations, I kept receiving warnings like “FetchFailed”, if memory usage reaches the limit. Thanks YC
Re: Submiting Spark application through code
Thanks boss its working :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submiting-Spark-application-through-code-tp17452p18250.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: foreachRDD network output
Any one, any luck? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-foreachRDD-network-output-tp15205p18251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: log4j logging control via sbt
How about adding the following in your $SPARK_HOME/conf/log4j.properties file? # Set WARN to be logged to the console *log4j.rootCategory=WARN, console* log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO Thanks Best Regards On Thu, Nov 6, 2014 at 6:52 AM, Simon Hafner reactorm...@gmail.com wrote: I've tried to set the log4j logger to warn only via log4j properties file in cat src/test/resources/log4j.properties log4j.logger.org.apache.spark=WARN or in sbt via javaOptions += -Dlog4j.logger.org.apache.spark=WARN But the logger still gives me INFO messages to stdout when I run my tests via sbt test Is it the wrong option? I also tried javaOptions += -Dlog4j.rootLogger=warn but that doesn't seem to help either. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to trace/debug serialization?
This is more about mechanism of Scala compiler and Java serialization. By default, Java will serialize an object deeply and recursively. Secondly, how Scala compiler generates the byte codes does matter. I'm not a Scala expert. Here is just some observation: 1. If the function does not use any outer variable, it should be able to serialized. 2. If the function uses some outer variables in a Scala `object`, it does not require the outer `object` and variables be Serializable. 3. If the function uses some outer variables in a Scala `class` instance, this class should be Serializable because the function will have a field which refer to this outer class instance. 4. If the function uses some outer variables in a method, these variables should be Serializable because the function will have a field which refer to them. At last, javap is a friend to diagnose such serialization error. Best Regards, Shixiong Zhu 2014-11-06 7:56 GMT+08:00 ankits ankitso...@gmail.com: In my spark job, I have a loop something like this: bla.forEachRdd(rdd = { //init some vars rdd.forEachPartition(partiton = { //init some vars partition.foreach(kv = { ... I am seeing serialization errors (unread block data), because I think spark is trying to serialize the whole containing class. But I have been careful not to reference instance vars in the block. Is there a way to see exactly what class is failing serialization, and maybe how spark decided it needs to be serialized? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-trace-debug-serialization-tp18230.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Number cores split up
Hi, I have a 2 node yarn cluster and I am using spark 1.1.0 to submit my tasks. As per the documentation of spark, number of cores are maximum cores available. So does it mean each node creates no of cores = no of threads to process the job assigned to that node. For ex, ListInteger data = new ArrayListInteger(); for(int i=0;i1000;i++) data.add(i); JavaRDDInteger distData = sc.parallelize(data); distData=distData.map( new FunctionInteger, Integer() { @Override public Integer call(Integer arg0) throws Exception { return arg0*arg0; } } ); distData.count(); The above program dividing my RDD into 2 batches of 500 size, and submitting to the executors. 1) So each executor will use all the cores of the node CPU to process 500 size batch am I right? 2) If so, Does it mean each executor uses multi threading? Is that execution parallel or sequential on node. 3) How to check how many cores an executor is using to process my jobs? 4) Do we have any chance to control the batch division on nodes? Please give some clarity on above. Thanks Regards, Naveen
Re: how to blend a DStream and a broadcast variable?
Broadcast vars should work fine in Spark streaming. Broadcast vars are immutable however. If you have some info to cache which might change from batch to batch, you should be able to load it at the start of your 'foreachRDD' method or equivalent. That's simple and works assuming your batch interval isn't so short and data so big that loading it every time is a burden. On Wed, Nov 5, 2014 at 11:30 PM, spr s...@yarcdata.com wrote: My use case has one large data stream (DS1) that obviously maps to a DStream. The processing of DS1 involves filtering it for any of a set of known values, which will change over time, though slowly by streaming standards. If the filter data were static, it seems to obviously map to a broadcast variable, but it's dynamic. (And I don't think it works to implement it as a DStream, because the new values need to be copied redundantly to all executors, not partitioned among the executors.) Looking at the Spark and Spark Streaming documentation, I have two questions: 1) There's no mention in the Spark Streaming Programming Guide of broadcast variables. Do they coexist properly? 2) Once I have a broadcast variable in place in the periodic function that Spark Streaming executes, how can I update its value? Obviously I can't literally update the value of that broadcast variable, which is immutable, but how can I get a new version of the variable established in all the executors? (And the other ever-present implicit question...) 3) Is there a better way to implement this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-blend-a-DStream-and-a-broadcast-variable-tp18227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
You didn't say what isn't serializable or where the exception occurs, but, is it the same as this issue? https://issues.apache.org/jira/browse/SPARK-4196 On Thu, Nov 6, 2014 at 5:42 AM, Vasu C vasuc.bigd...@gmail.com wrote: Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set(spark.cores.max, 3); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance final JavaDStreamString textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); Regards, Vasu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparse x sparse matrix multiplication
I think Xiangrui's ALS code implement certain aspect of it. You may want to check it out. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center From: Xiangrui Meng men...@gmail.com To: Duy Huynh duy.huynh@gmail.com Cc: user u...@spark.incubator.apache.org Date: 11/05/2014 01:13 PM Subject:Re: sparse x sparse matrix multiplication You can use breeze for local sparse-sparse matrix multiplication and then define an RDD of sub-matrices RDD[(Int, Int, CSCMatrix[Double])] (blockRowId, blockColId, sub-matrix) and then use join and aggregateByKey to implement this feature, which is the same as in MapReduce. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to use HiveContext in spark-shell
I am connecting to a remote master using spark shell. Then I am getting following error while trying to instantiate HiveContext. scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) error: bad symbolic reference. A signature in HiveContext.class refers to term hive in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. error: while compiling: console during phase: erasure library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: last tree to typer: Apply(value $outer) symbol: value $outer (flags: method synthetic stable expandedname triedcooking) symbol definition: val $outer(): $iwC.$iwC.type tpe: $iwC.$iwC.type symbol owners: value $outer - class $iwC - class $iwC - class $iwC - class $read - package $line5 context owners: class $iwC - class $iwC - class $iwC - class $iwC - class $read - package $line5 == Enclosing template or block == ClassDef( // class $iwC extends Serializable 0 $iwC [] Template( // val local $iwC: notype, tree.tpe=$iwC java.lang.Object, scala.Serializable // parents ValDef( private _ tpt empty ) // 5 statements DefDef( // def init(arg$outer: $iwC.$iwC.$iwC.type): $iwC method triedcooking init [] // 1 parameter list ValDef( // $outer: $iwC.$iwC.$iwC.type $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) tpt // tree.tpe=$iwC Block( // tree.tpe=Unit Apply( // def init(): Object in class Object, tree.tpe=Object $iwC.super.init // def init(): Object in class Object, tree.tpe=()Object Nil ) () ) ) ValDef( // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext private local triedcooking sqlContext tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext Apply( // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext new org.apache.spark.sql.hive.HiveContext.init // def init(sc: org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class HiveContext, tree.tpe=(sc: org.apache.spark.SparkContext)org.apache.spark.sql.hive.HiveContext Apply( // val sc(): org.apache.spark.SparkContext, tree.tpe=org.apache.spark.SparkContext $iwC.this.$line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$$outer().$VAL1().$iw().$iw().sc // val sc(): org.apache.spark.SparkContext, tree.tpe=()org.apache.spark.SparkContext Nil ) ) ) DefDef( // val sqlContext(): org.apache.spark.sql.hive.HiveContext method stable accessor sqlContext [] List(Nil) tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext $iwC.this.sqlContext // private[this] val sqlContext: org.apache.spark.sql.hive.HiveContext, tree.tpe=org.apache.spark.sql.hive.HiveContext ) ValDef( // protected val $outer: $iwC.$iwC.$iwC.type protected synthetic paramaccessor triedcooking $outer tpt // tree.tpe=$iwC.$iwC.$iwC.type empty ) DefDef( // val $outer(): $iwC.$iwC.$iwC.type method synthetic stable expandedname triedcooking $line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer [] List(Nil) tpt // tree.tpe=Any $iwC.this.$outer // protected val $outer: $iwC.$iwC.$iwC.type, tree.tpe=$iwC.$iwC.$iwC.type ) ) ) == Expanded type of tree == ThisType(class $iwC) uncaught exception during compilation: scala.reflect.internal.Types$TypeError scala.reflect.internal.Types$TypeError: bad symbolic reference. A signature in HiveContext.class refers to term conf in value org.apache.hadoop.hive which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling HiveContext.class. That entry seems to have slain the compiler. Shall I replay your session? I can re-run each line except the last one. [y/n] Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveContext-in-spark-shell-tp18261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org