Problems with JobScheduler
I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in another cluster there isn't this delay in the JobScheduler. I checked the configuration in YARN in both clusters and it seems the same. The log in the cluster is working good is 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID 279, xx, RACK_LOCAL, 14643 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID 280, x, RACK_LOCAL, 14639 bytes) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:45132 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID 281, xxx, RACK_LOCAL, 14647 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID 279) in 121 ms on (1/3) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:49886 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID 281) in 261 ms on xx (2/3) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0 (TID 280) in 519 ms on x (3/3) 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at MetricsSpark.scala:67) finished in 0.522 s 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks have all completed, from pool 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job 1438259855000 ms.0 from job set of time 1438259855000 ms 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time 1438259855000 ms (execution: 0.540 s) 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list Any clue about where I could take a look? Number of cpus in YARN is enough. I executing YARN with same options (--master yarn-server with 1g of memory in both)
Re: Spark and Speech Recognition
Oh... That was embarrassingly easy! Thank you that was exactly the understanding of partitions that I needed. P On Thu, Jul 30, 2015 at 6:35 AM, Simon Elliston Ball si...@simonellistonball.com wrote: You might also want to consider broadcasting the models to ensure you get one instance shared across cores in each machine, otherwise the model will be serialised to each task and you'll get a copy per executor (roughly core in this instance) Simon Sent from my iPhone On 30 Jul 2015, at 10:14, Akhil Das ak...@sigmoidanalytics.com wrote: Like this? val data = sc.textFile(/sigmoid/audio/data/, 24).foreachPartition(urls = speachRecognizer(urls)) Let 24 be the total number of cores that you have on all the workers. Thanks Best Regards On Wed, Jul 29, 2015 at 6:50 AM, Peter Wolf opus...@gmail.com wrote: Hello, I am writing a Spark application to use speech recognition to transcribe a very large number of recordings. I need some help configuring Spark. My app is basically a transformation with no side effects: recording URL -- transcript. The input is a huge file with one URL per line, and the output is a huge file of transcripts. The speech recognizer is written in Java (Sphinx4), so it can be packaged as a JAR. The recognizer is very processor intensive, so you can't run too many on one machine-- perhaps one recognizer per core. The recognizer is also big-- maybe 1 GB. But, most of the recognizer is a immutable acoustic and language models that can be shared with other instances of the recognizer. So I want to run about one recognizer per core of each machine in my cluster. I want all recognizer on one machine to run within the same JVM and share the same models. How does one configure Spark for this sort of application? How does one control how Spark deploys the stages of the process. Can someone point me to an appropriate doc or keywords I should Google. Thanks Peter
Re: Running Spark on user-provided Hadoop installation
Herman: For Pre-built with user-provided Hadoop, spark-1.4.1-bin-hadoop2.6.tgz, e.g., uses hadoop-2.6 profile which defines versions of projects Spark depends on. Hadoop cluster is used to provide storage (hdfs) and resource management (YARN). For the latter, please see: https://spark.apache.org/docs/latest/running-on-yarn.html Cheers On Thu, Jul 30, 2015 at 1:48 AM, hermansc herman.schis...@gmail.com wrote: Hi. I want to run Spark, and more specifically the Pre-build with user-provided Hadoop version from the downloads page, but I can't find any documentation on how to connect the two components together (namely Spark and Hadoop). I've had some success in settting SPARK_CLASSPATH to my hadoop distribution lib/ directory, containing jar files such as hadoop-core, hadoop-common etc. However, there seems to be many native libraries included in the assembly jar for Spark versions pre-built for Hadoop distributions (I'm specifically missing the libsnappy.so files) that are not by default included in distributions such as Cloudera Hadoop. Have anyone here actually tried to run Spark without Hadoop included in the assembly jar and/or have any more resources where I can read about the proper way of connecting them? As an aside, the spark-assembly jar in the Spark version pre-built for user-provided Hadoop distributions is named spark-assembly-1.4.0-hadoop2.2.0.jar, which doesn't make sense - it should be called spark-assembly-1.4.0-without-hadoop.jar :) -- Herman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-user-provided-Hadoop-installation-tp24076.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems with JobScheduler
I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in another cluster there isn't this delay in the JobScheduler. I checked the configuration in YARN in both clusters and it seems the same. The log in the cluster is working good is 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID 279, xx, RACK_LOCAL, 14643 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID 280, x, RACK_LOCAL, 14639 bytes) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:45132 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID 281, xxx, RACK_LOCAL, 14647 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID 279) in 121 ms on (1/3) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:49886 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID 281) in 261 ms on xx (2/3) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0 (TID 280) in 519 ms on x (3/3) 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at MetricsSpark.scala:67) finished in 0.522 s 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks have all completed, from pool 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s
Re: Spark on YARN
Thanks for information this fixed the issue. Issue was in spark-master memory when I specify manually 1G for master. it start working On 30 July 2015 at 14:26, Shao, Saisai saisai.s...@intel.com wrote: You’d better also check the log of nodemanager, sometimes because your memory usage exceeds the limit of Yarn container’s configuration. I’ve met similar problem before, here is the warning log in nodemanager: 2015-07-07 17:06:07,141 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=17385,containerID=container_1436259427993_0001_02_01] is running beyond virtual memory limits. Current usage: 318.1 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container. The default pmem-vmem ratio is 2.1, but seems executor requires more vmem when started, so nodemanager will kill it. If you met similar problem, you could increase this configuration “yarn.nodemanager.vmem-pmem-ratio”. Thanks Jerry *From:* Jeff Zhang [mailto:zjf...@gmail.com] *Sent:* Thursday, July 30, 2015 4:36 PM *To:* Jeetendra Gangele *Cc:* user *Subject:* Re: Spark on YARN 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM AM is killed somehow, may due to preemption. Does it always happen ? Resource manager log would be helpful. On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele gangele...@gmail.com wrote: I can't see the application logs here. All the logs are going into stderr. can anybody help here? On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.com wrote: I am running below command this is default spark PI program but this is not running all the log are going in stderr but at the terminal job is succeeding .I guess there are con issue job it not at all launching /bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10 Complete log SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1438090734187_0010_01 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1 15/07/30 12:13:33 WARN spark.SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/30 12:13:33 INFO Remoting: Starting remoting 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.21.1.77:53411] 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 53411. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker
Re: Graceful shutdown for Spark Streaming
Yes I was doing same , if You mean that this is the correct way to do Then I will verify it once more in my case . On Thu, Jul 30, 2015 at 1:02 PM, Tathagata Das t...@databricks.com wrote: How is sleep not working? Are you doing streamingContext.start() Thread.sleep(xxx) streamingContext.stop() On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com wrote: If we want to stop the application after fix-time period , how it will work . (How to give the duration in logic , in my case sleep(t.s.) is not working .) So i used to kill coarseGrained job at each slave by script .Please suggest something . On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com wrote: StreamingContext.stop(stopGracefully = true) stops the streaming context gracefully. Then you can safely terminate the Spark cluster. They are two different steps and needs to be done separately ensuring that the driver process has been completely terminated before the Spark cluster is the terminated. On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com wrote: How to initiate graceful shutdown from outside of the Spark Streaming driver process? Both for the local and cluster mode of Spark Standalone as well as EMR. Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there a signal sent to the driver process? For EMR, is there a way how to terminate an EMR cluster with Spark Streaming graceful shutdown? Thanks! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: How to set log level in spark-submit ?
Did you use an absolute path in $path_to_file? I just tried this with spark-shell v1.4.1 and it worked for me. If the URL is wrong, you should see an error message from log4j that it can't find the file. For windows it would be something like file:/c:/path/to/file, I believe. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 30, 2015 at 4:41 AM, Alexander Krasheninnikov a.krasheninni...@corp.badoo.com wrote: I saw such example in docs: --conf spark.driver.extraJavaOptions=-Dlog4j.configuration= file://$path_to_file but, unfortunately, it does not work for me. On 30.07.2015 05:12, canan chen wrote: Yes, that should work. What I mean is is there any option in spark-submit command that I can specify for the log level On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com wrote: Put a log4j.properties file in conf/. You can copy log4j.properties.template as a good base El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com escribió: Anyone know how to set log level in spark-submit ? Thanks
Python version collision
Hi, I find rather confusing the documentation about the configuration options. There are a lot of files that are not too clear on where to modify. For example, spark-env vs spark-defaults. I am getting an error with Python versions collision: File /root/spark/python/lib/pyspark.zip/pyspark/worker.py, line 64, in main (%d.%d % sys.version_info[:2], version)) Exception: Python in worker has different version 2.7 than that in driver 3.4, PySpark cannot run with different minor versions at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:179) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) But I have conf/spark-env.sh with: PYSPARK_PYTHON=python3.4 Also, I am not sure about the shebang line there is in the top of the spark-env because sourcing it would make the env vars be defined in a subrpocess, so I removed that, but anyway, I am having the same problem, Anyone has experience using python3? And with python3 in virtualenv? Also, as a matter of feedback, I find rather difficult to deploy and develop apps because although you may have ipython notebook, I haven't found a way to include pyspark in my environment (with the rest of the virtualenv libraries). Thanks!
Re: Spark Interview Questions
i have prepared some interview questions: http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-1 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2 please provide your feedback. On Wed, Jul 29, 2015, 23:43 Pedro Rodriguez ski.rodrig...@gmail.com wrote: You might look at the edx course on Apache Spark or ML with Spark. There are probably some homework problems or quiz questions that might be relevant. I haven't looked at the course myself, but thats where I would go first. https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re: help plz! how to use zipWithIndex to each subset of a RDD
Hi @rok, thanks I got it -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071p24080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems with JobScheduler
They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in another cluster there isn't this delay in the JobScheduler. I checked the configuration in YARN in both clusters and it seems the same. The log in the cluster is working good is 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID 279, xx, RACK_LOCAL, 14643 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID 280, x, RACK_LOCAL, 14639 bytes) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:45132 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID 281, xxx, RACK_LOCAL, 14647 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID 279) in 121 ms on (1/3) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:49886 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID 281) in 261 ms on xx (2/3) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0 (TID 280) in 519 ms on x (3/3) 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at MetricsSpark.scala:67) finished in 0.522 s 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks have all completed, from pool 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job 1438259855000 ms.0 from job set of time 1438259855000 ms 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time 1438259855000 ms (execution: 0.540 s) 15/07/30 14:37:35 INFO
Re: Twitter Connector-Spark Streaming
You can create a custom receiver and then inside it you can write yourown piece of code to receive data, filter them etc before giving it to spark. Thanks Best Regards On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan sa...@platalytics.com wrote: okay :) then is there anyway to fetch the tweets specific to my account? Thanks in anticipation :) On Thu, Jul 30, 2015 at 6:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Owh, this one fetches the public tweets, not the one specific to your account. Thanks Best Regards On Thu, Jul 30, 2015 at 6:11 PM, Sadaf Khan sa...@platalytics.com wrote: yes. but can you please tell me how to mention a specific user account in filter? I want to fetch my tweets, tweets of my followers and the tweets of those whom i followed. So in short i want to fatch the tweets of my account only. Recently i have used val tweets =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2) Any response will be very much appreciated. :) Thanks. On Thu, Jul 30, 2015 at 5:20 PM, Akhil Das ak...@sigmoidanalytics.com wrote: TwitterUtils.createStream takes a 3rd argument which is a filter, once you provide these, it will only fetch tweets of such. Thanks Best Regards On Thu, Jul 30, 2015 at 4:19 PM, Sadaf sa...@platalytics.com wrote: Hi. I am writing twitter connector using spark streaming. but it fetched the random tweets. Is there any way to receive the tweets of a particular account? I made an app on twitter and used the credentials as given below. def managingCredentials(): Option[twitter4j.auth.Authorization]= { object auth{ val config = new twitter4j.conf.ConfigurationBuilder() .setOAuthConsumerKey() .setOAuthConsumerSecret() .setOAuthAccessToken() .setOAuthAccessTokenSecret() .build } val twitter_auth = new TwitterFactory(auth.config) val a = new twitter4j.auth.OAuthAuthorization(auth.config) val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization()) atwitter } Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Connector-Spark-Streaming-tp24078.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Connection closed/reset by peers error
I am having the same issue. Have you found any resolution ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connection-closed-reset-by-peers-error-tp21459p24081.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems with JobScheduler
Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in another cluster there isn't this delay in the JobScheduler. I checked the configuration in YARN in both clusters and it seems the same. The log in the cluster is working good is 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID 279, xx, RACK_LOCAL, 14643 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID 280, x, RACK_LOCAL, 14639 bytes) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:45132 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID 281, xxx, RACK_LOCAL, 14647 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID 279) in 121 ms on (1/3) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:49886 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID 281) in 261 ms on xx (2/3)
Re: Problems with JobScheduler
I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in another cluster there isn't this delay in the JobScheduler. I checked the configuration in YARN in both clusters and it seems the same. The log in the cluster is working good is 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID 279, xx, RACK_LOCAL, 14643 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID 280, x, RACK_LOCAL, 14639 bytes) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:45132 (size: 1801.0 B, free: 530.3 MB) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID 281, xxx, RACK_LOCAL, 14647 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID 279) in 121 ms on (1/3) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:49886
Spark Master Build Git Commit Hash
Hi Spark users and developers, I wonder which git commit was used to build the latest master-nightly build found at: http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/? I downloaded the build but I couldn't find the information related to it. Thank you! Best Regards, Jerry
Re: Lost task - connection closed
I am getting same error. Any resolution on this issue ? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361p24082.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrade of Spark-Streaming application
You can't use checkpoints across code upgrades. That may or may not change in the future, but for now that's a limitation of spark checkpoints (regardless of whether you're using Kafka). Some options: - Start up the new job on a different cluster, then kill the old job once it's caught up to where the new job started. If you care about duplicate work, you should be doing idempotent / transactional writes anyway, which should take care of the overlap between the two. If you're doing batches, you may need to be a little more careful about handling batch boundaries - Store the offsets somewhere other than the checkpoint, and provide them on startup using the fromOffsets argument to createDirectStream On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro nibbi...@gmail.com wrote: Hi, I've read about the recent updates about spark-streaming integration with Kafka (I refer to the new approach without receivers). In the new approach, metadata are persisted in checkpoint folders on HDFS so that the SparkStreaming context can be recreated in case of failures. This means that the streaming application will restart from the where it exited and the message consuming process continues with new messages only. Also, if I manually stop the streaming process and recreate the context from checkpoint (using an approach similar to https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala), the behavior would be the same. Now, suppose I want to change something in the software and modify the processing pipeline. Can spark use the previous checkpoint to recreate the new application? Will I ever be able to upgrade the software without processing all the messages in Kafka again? Regards, Nicola
Re: Problems with JobScheduler
If the jobs are running on different topicpartitions, what's different about them? Is one of them 120x the throughput of the other, for instance? You should be able to eliminate cluster config as a difference by running the same topic partition on the different clusters and comparing the results. On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in another cluster there isn't this delay in the JobScheduler. I checked the configuration in YARN in both clusters and it seems the same. The log in the cluster is working good is 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID 279, xx, RACK_LOCAL, 14643 bytes) 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID 280, x, RACK_LOCAL, 14639 bytes) 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in memory on x:45132
Re: Spark build/sbt assembly
Hi Akhil, Yes I did try to remove it, and i tried to build again. However that jar keeps getting recreated, whenever i run ./build/sbt assembly Thanks, Rahul P On Thu, Jul 30, 2015 at 12:38 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try removing this jar? build/sbt-launch-0.13.7.jar Thanks Best Regards On Tue, Jul 28, 2015 at 12:08 AM, Rahul Palamuttam rahulpala...@gmail.com wrote: Hi All, I hope this is the right place to post troubleshooting questions. I've been following the install instructions and I get the following error when running the following from Spark home directory $./build/sbt Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar However when I run sbt assembly it compiles, with a couple of warnings, but it works none-the less. Is the build/sbt script deprecated? I do notice on one node it works but on the other it gives me the above error. Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Kafka could not find leader offset for Set()
Hi Cody sorry my bad you were right there was a typo in topicSet. When I corrected typo in topicSet it started working. Thanks a lot. Regards On Thu, Jul 30, 2015 at 7:43 PM, Cody Koeninger c...@koeninger.org wrote: Can you post the code including the values of kafkaParams and topicSet, ideally the relevant output of kafka-topics.sh --describe as well On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks for the response. Like I already mentioned in the question kafka topic is valid and it has data I can see data in it using another kafka consumer. On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote: The last time someone brought this up on the mailing list, the issue actually was that the topic(s) didn't exist in Kafka at the time the spark job was running. On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: There is a known issue that Kafka cannot return leader if there is not data in the topic. I think it was raised in another thread in this forum. Is that the issue? On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have Spark Streaming code which streams from Kafka topic it used to work fine but suddenly it started throwing the following exception Exception in thread main org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set() at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) My Spark Streaming client code is very simple I just create one receiver using the following code and trying to print messages it consumed JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet); Kafka param is only one I specify kafka.ofset.reset=largest. Kafka topic has data I can see data using other Kafka consumers but above Spark Streaming code throws exception saying leader offset not found. I tried both smallest and largest offset. I wonder what happened this code used to work earlier. I am using Spark-Streaming 1.3.1 as it was working in this version I tried in 1.4.1 and same exception. Please guide. I am new to Spark thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Kafka could not find leader offset for Set()
Can you post the code including the values of kafkaParams and topicSet, ideally the relevant output of kafka-topics.sh --describe as well On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks for the response. Like I already mentioned in the question kafka topic is valid and it has data I can see data in it using another kafka consumer. On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote: The last time someone brought this up on the mailing list, the issue actually was that the topic(s) didn't exist in Kafka at the time the spark job was running. On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: There is a known issue that Kafka cannot return leader if there is not data in the topic. I think it was raised in another thread in this forum. Is that the issue? On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have Spark Streaming code which streams from Kafka topic it used to work fine but suddenly it started throwing the following exception Exception in thread main org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set() at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) My Spark Streaming client code is very simple I just create one receiver using the following code and trying to print messages it consumed JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet); Kafka param is only one I specify kafka.ofset.reset=largest. Kafka topic has data I can see data using other Kafka consumers but above Spark Streaming code throws exception saying leader offset not found. I tried both smallest and largest offset. I wonder what happened this code used to work earlier. I am using Spark-Streaming 1.3.1 as it was working in this version I tried in 1.4.1 and same exception. Please guide. I am new to Spark thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to register array class with Kyro in spark-defaults.conf
I register my class with Kyro in spark-defaults.conf as follow spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrationRequired true spark.kryo.classesToRegister ltn.analytics.es.EsDoc But I got the following exception java.lang.IllegalArgumentException: Class is not registered: ltn.analytics.es.EsDoc[] Note: To register this class use: kryo.register(ltn.analytics.es.EsDoc[].class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) The error message seems to suggest that I should also register the array class EsDoc[]. So I add it to spark-defaults.conf as follow spark.kryo.classesToRegister ltn.analytics.es.EsDoc,ltn.analytics.es.EsDoc[] Then I got the following error org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:153) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) at ltn.analytics.index.Index.addDocuments(Index.scala:82) Please advise. Thanks. Ningjun
Failed to load class for data source: org.apache.spark.sql.cassandra
Hey all, I'm running what should be a very straight-forward application of the Cassandra sql connector, and I'm getting an error: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: org.apache.spark.sql.cassandra at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at com.latticeengines.test.CassandraTest$.main(CassandraTest.scala:33) at com.latticeengines.test.CassandraTest.main(CassandraTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/07/30 15:34:47 INFO spark.SparkContext: Invoking stop() from shutdown hook My jar is shaded, so I assume this shouldn't happen? Here's the code I'm trying to run: object CassandraTest { def main(args: Array[String]) { println(Hello, scala!) var conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext .read .format(org.apache.spark.sql.cassandra) .options(Map( table - kv, keyspace - test)) .load() val w = Window.orderBy(value).rowsBetween(-2, 0) df.select(mean(value).over(w)) } }
RE: Failed to load class for data source: org.apache.spark.sql.cassandra
I'm submitting the application this way: spark-submit test-2.0.5-SNAPSHOT-jar-with-dependencies.jar I've confirmed that org.apache.spark.sql.cassandra and org.apache.cassandra classes are in the jar. Apologies for this relatively newbie question - I'm still new to both spark and scala. Thanks, Ben From: Benjamin Ross Sent: Thursday, July 30, 2015 3:45 PM To: user@spark.apache.org Subject: Failed to load class for data source: org.apache.spark.sql.cassandra Hey all, I'm running what should be a very straight-forward application of the Cassandra sql connector, and I'm getting an error: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: org.apache.spark.sql.cassandra at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at com.latticeengines.test.CassandraTest$.main(CassandraTest.scala:33) at com.latticeengines.test.CassandraTest.main(CassandraTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/07/30 15:34:47 INFO spark.SparkContext: Invoking stop() from shutdown hook My jar is shaded, so I assume this shouldn't happen? Here's the code I'm trying to run: object CassandraTest { def main(args: Array[String]) { println(Hello, scala!) var conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext .read .format(org.apache.spark.sql.cassandra) .options(Map( table - kv, keyspace - test)) .load() val w = Window.orderBy(value).rowsBetween(-2, 0) df.select(mean(value).over(w)) } }
Re: Spark Master Build Git Commit Hash
Looking at: https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-NightlyBuilds Maven artifacts should be here: https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-core_2.10/1.5.0-SNAPSHOT/ though the jars were dated July 16th. FYI On Thu, Jul 30, 2015 at 12:11 PM, Jerry Lam chiling...@gmail.com wrote: Hi Ted, The problem is that I don't know if the build uses the commits happened on the same day or it is possible that it builds based on Jul 15th commits. Just a thought, it might be possible to replace SNAPSHOT with the git commit hash in the filename so people will know which one is based on. Thank you for your help! Jerry On Thu, Jul 30, 2015 at 11:10 AM, Ted Yu yuzhih...@gmail.com wrote: The files were dated 16-Jul-2015 Looks like nightly build either was not published, or published at a different location. You can download spark-1.5.0-SNAPSHOT.tgz and binary-search for the commits made on Jul 16th. There may be other ways of determining the latest commit. Cheers On Thu, Jul 30, 2015 at 7:39 AM, Jerry Lam chiling...@gmail.com wrote: Hi Spark users and developers, I wonder which git commit was used to build the latest master-nightly build found at: http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/ ? I downloaded the build but I couldn't find the information related to it. Thank you! Best Regards, Jerry
Re: Spark SQL DataFrame: Nullable column and filtering
Perhaps I'm missing what you are trying to accomplish, but if you'd like to avoid the null values do an inner join instead of an outer join. Additionally, I'm confused about how the result of joinedDF.filter(joinedDF( y).isNotNull).show still contains null values in the column y. This doesn't really have anything to do with nullable, which is only a hint to the system so that we can avoid null checking when we know that there are no null values. If you provide the full code i can try and see if this is a bug. On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com wrote: Dear Michael, dear all, motivation: object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } Now I want to perform an *left outer join* on records and mappings (with the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) shorthand is in *leftOuterJoinWithRemovalOfEqualColumn* val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x) joinedDF.filter(joinedDF(y).isNotNull).show Currently, the output is +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ instead of +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ The last output can be achieved by the method of changing nullable=false to nullable=true described in my first post. *Thus, I need this schema modification as to make outer joins work.* Cheers and thanks, Martin 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com: We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Parquet + Dataframes] Column names with spaces
You can't use these names due to limitations in parquet (and the library it self with silently generate corrupt files that can't be read, hence the error we throw). You can alias a column by df.select(df(old).alias(new)), which is essential what withColumnRenamed does. Alias in this case means renaming. On Thu, Jul 30, 2015 at 11:49 AM, angelini alex.angel...@shopify.com wrote: Hi all, Our data has lots of human readable column names (names that include spaces), is it possible to use these with Parquet and Dataframes? When I try and write the Dataframe I get the following error: (I am using PySpark) `AnalysisException: Attribute name Name with Space contains invalid character(s) among ,;{}()\n\t=. Please use alias to rename it.` How can I alias that column name? `df['Name with Space'] = df['Name with Space'].alias('Name')` doesn't work as you can't assign to a dataframe column. `df.withColumnRenamed('Name with Space', 'Name')` overwrites the column and doesn't alias it. Any ideas? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-Dataframes-Column-names-with-spaces-tp24088.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unsubscribe
https://www.youtube.com/watch?v=JncgoPKklVE On Thu, Jul 30, 2015 at 1:30 PM, ziqiu...@accenture.com wrote: -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
unsubscribe
This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: Problems with JobScheduler
Yes, and that is indeed the problem. It is trying to process all the data in Kafka, and therefore taking 60 seconds. You need to set the rate limits for that. On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org wrote: If you don't set it, there is no maximum rate, it will get everything from the end of the last batch to the maximum available offset On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com wrote: The difference is that one recives more data than the others two. I can pass thought parameters the topics, so, I could execute the code trying with one topic and figure out with one is the topic, although I guess that it's the topics which gets more data. Anyway it's pretty weird those delays in just one of the cluster even if the another one is not running. I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I haven't set any value for this parameter, how does it work if this parameter doesn't have a value? 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org: If the jobs are running on different topicpartitions, what's different about them? Is one of them 120x the throughput of the other, for instance? You should be able to eliminate cluster config as a difference by running the same topic partition on the different clusters and comparing the results. On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67,
How do i specify the data types in a DF
Hello; I have a simple file of mobile IDFA. they look like gregconv = ['00013FEE-7561-47F3-95BC-CA18D20BCF78', '000D9B97-2B54-4B80-AAA1-C1CB42CFBF3A', '000F9E1F-BC7E-47E1-BF68-C68F6D987B96'] I am trying to make this RDD into a data frame ConvRecord = Row(IDFA) gregconvdf = gregconv.map(lambda x: ConvRecord(*x)).toDF() i get the following error Traceback (most recent call last): File stdin, line 1, in module File /homes/afarahat/aofspark/share/spark/python/pyspark/sql/context.py, line 60, in toDF return sqlContext.createDataFrame(self, schema, sampleRatio) File /homes/afarahat/aofspark/share/spark/python/pyspark/sql/context.py, line 351, in createDataFrame _verify_type(row, schema) File /homes/afarahat/aofspark/share/spark/python/pyspark/sql/types.py, line 1027, in _verify_type length of fields (%d) % (len(obj), len(dataType.fields))) ValueError: Length of object (36) does not match with length of fields (1) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-i-specify-the-data-types-in-a-DF-tp24090.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does Spark Streaming need to list all the files in a directory?
Is this a known bottle neck for Spark Streaming textFileStream? Does it need to list all the current files in a directory before he gets the new files? Say I have 500k files in a directory, does it list them all in order to get the new files?
Re: Does Spark Streaming need to list all the files in a directory?
For the first time it needs to list them. AFter that the list should be cached by the file stream implementation (as far as I remember). On Thu, Jul 30, 2015 at 3:55 PM, Brandon White bwwintheho...@gmail.com wrote: Is this a known bottle neck for Spark Streaming textFileStream? Does it need to list all the current files in a directory before he gets the new files? Say I have 500k files in a directory, does it list them all in order to get the new files?
Parquet SaveMode.Append Trouble.
Hi, I am new to using Spark and Parquet files, Below is what i am trying to do, on Spark-shell, val df = sqlContext.parquetFile(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet) Have also tried below command, val df=sqlContext.read.format(parquet).load(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet) Now i have an other existing parquet file to which i want to append this Parquet file data of df. so i use, df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet, SaveMode.Append ) also tried below command, df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet, SaveMode.Append ) and it throws me below error, console:26: error: not found: value SaveMode df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet, SaveMode.Append ) Please help me, in case i am doing something wrong here. Regards, Satyajit.
RE: Failed to load class for data source: org.apache.spark.sql.cassandra
If anyone's curious, the issue here is that I was using the 1.2.4 connector of the datastax spark Cassandra connector, rather than the 1.4.0-M1 pre-release. 1.2.4 doesn't fully support data frames, and it's presumably still only experimental in 1.4.0-M1. Ben From: Benjamin Ross Sent: Thursday, July 30, 2015 4:14 PM To: user@spark.apache.org Subject: RE: Failed to load class for data source: org.apache.spark.sql.cassandra I'm submitting the application this way: spark-submit test-2.0.5-SNAPSHOT-jar-with-dependencies.jar I've confirmed that org.apache.spark.sql.cassandra and org.apache.cassandra classes are in the jar. Apologies for this relatively newbie question - I'm still new to both spark and scala. Thanks, Ben From: Benjamin Ross Sent: Thursday, July 30, 2015 3:45 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Failed to load class for data source: org.apache.spark.sql.cassandra Hey all, I'm running what should be a very straight-forward application of the Cassandra sql connector, and I'm getting an error: Exception in thread main java.lang.RuntimeException: Failed to load class for data source: org.apache.spark.sql.cassandra at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at com.latticeengines.test.CassandraTest$.main(CassandraTest.scala:33) at com.latticeengines.test.CassandraTest.main(CassandraTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/07/30 15:34:47 INFO spark.SparkContext: Invoking stop() from shutdown hook My jar is shaded, so I assume this shouldn't happen? Here's the code I'm trying to run: object CassandraTest { def main(args: Array[String]) { println(Hello, scala!) var conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext .read .format(org.apache.spark.sql.cassandra) .options(Map( table - kv, keyspace - test)) .load() val w = Window.orderBy(value).rowsBetween(-2, 0) df.select(mean(value).over(w)) } }
Re: Spark SQL DataFrame: Nullable column and filtering
Dear Michael, dear all, distinguishing those records that have a match in mapping from those that don't is the crucial point. Record(x : Int, a: String) Mapping(x: Int, y: Int) Thus Record(1, hello) Record(2, bob) Mapping(2, 5) yield (2, bob, 5) on an inner join. BUT I'm also interested in (1, hello, null) as there is no counterpart in mapping (this is the left outer join part) I need to distinguish 1 and 2 because of later inserts (case 1, hello) or updates (case 2, bon). Cheers and thanks, Martin Am 30.07.2015 22:58 schrieb Michael Armbrust mich...@databricks.com: Perhaps I'm missing what you are trying to accomplish, but if you'd like to avoid the null values do an inner join instead of an outer join. Additionally, I'm confused about how the result of joinedDF.filter(joinedDF(y).isNotNull).show still contains null values in the column y. This doesn't really have anything to do with nullable, which is only a hint to the system so that we can avoid null checking when we know that there are no null values. If you provide the full code i can try and see if this is a bug. On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com wrote: Dear Michael, dear all, motivation: object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } Now I want to perform an left outer join on records and mappings (with the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) shorthand is in leftOuterJoinWithRemovalOfEqualColumn val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x) joinedDF.filter(joinedDF(y).isNotNull).show Currently, the output is +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ instead of +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ The last output can be achieved by the method of changing nullable=false to nullable=true described in my first post. Thus, I need this schema modification as to make outer joins work. Cheers and thanks, Martin 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com: We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:
RE: Heatmap with Spark Streaming
Umesh, You can create a web-service in any of the languages supported by Spark and stream the result from this web-service to your D3-based client using Websocket or Server-Sent Events. For example, you can create a webservice using Play. This app will integrate with Spark streaming in the back-end. The front-end can be a D3-based or any Javascript app. Play makes it easy to stream data to a web client. So a client does not need to continuously poll data from the back-end server. Mohammed From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Thursday, July 30, 2015 2:07 AM To: UMESH CHAUDHARY Cc: user@spark.apache.org Subject: Re: Heatmap with Spark Streaming You can integrate it with any language (like php) and use ajax calls to update the charts. Thanks Best Regards On Thu, Jul 30, 2015 at 2:11 PM, UMESH CHAUDHARY umesh9...@gmail.commailto:umesh9...@gmail.com wrote: Thanks For the suggestion Akhil! I looked at https://github.com/mbostock/d3/wiki/Gallery to know more about d3, all examples described here are on static data, how we can update our heat map from updated data, if we store it in Hbase or Mysql. I mean, do we need to query back and fourth for it. Is there any pluggable and more quick component for heatmap with spark streaming? On Thu, Jul 30, 2015 at 1:23 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: You can easily push data to an intermediate storage from spark streaming (like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3 js. Thanks Best Regards On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.commailto:umesh9...@gmail.com wrote: I have just started using Spark Streaming and done few POCs. It is fairly easy to implement. I was thinking of presenting the data using some smart graphing dashboarding tools e.g. Graphite or Grafna, but they don't have heat-maps. I also looked at Zeppelinhttp://zeppelin-project.org/ , but unable to found any heat-map functionality. Could you please suggest any data visualization tools using Heat-map and Spark streaming.
Losing files in hdfs after creating spark sql table
Hi, After I create a table in spark sql and load infile an hdfs file to it, the file is no longer queryable if I do hadoop fs -ls. Is this expected? Thanks, Ron - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How RDD lineage works
You have to read the original Spark paper to understand how RDD lineage works. https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! -- bit1...@163.com
How RDD lineage works
Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! bit1...@163.com
Re: Problem submiting an script .py against an standalone cluster.
Can you share the part of the code in your script where you create the SparkContext instance? On Thu, Jul 30, 2015 at 7:19 PM, fordfarline fordfarl...@gmail.com wrote: Hi All, I`m having an issue when lanching an app (python) against a stand alone cluster, but runs in local, as it doesn't reach the cluster. It's the first time i try the cluster, in local works ok. i made this: - /home/user/Spark/spark-1.3.0-bin-hadoop2.4/sbin/start-all.sh # Master and worker are up in localhost:8080/4040 - /home/user/Spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --master spark://localhost:7077 Script.py * The script runs ok but in local :(i can check it in localhost:4040, but i don't see any job in cluster UI The only warning it's: WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.132 instead (on interface eth0) I set SPARK_LOCAL_IP=127.0.0.1 to solve this, al least de warning disappear, but the script keep executing in local not in cluster. I think it has something to do with my virtual server: - Host Server: Linux Mint - The Virtual Server (workstation 10) where runs Spark is Linux Mint as well. Any ideas what am i doing wrong? Thanks in advance for any suggestion, i getting mad on it!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-submiting-an-script-py-against-an-standalone-cluster-tp24091.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: How RDD lineage works
Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! -- bit1...@163.com
Re: Problem submiting an script .py against an standalone cluster.
You might want to run spark-submit with option --deploy-mode cluster On Thursday, July 30, 2015 7:24 PM, Marcelo Vanzin van...@cloudera.com wrote: Can you share the part of the code in your script where you create the SparkContext instance? On Thu, Jul 30, 2015 at 7:19 PM, fordfarline fordfarl...@gmail.com wrote: Hi All, I`m having an issue when lanching an app (python) against a stand alone cluster, but runs in local, as it doesn't reach the cluster. It's the first time i try the cluster, in local works ok. i made this: - /home/user/Spark/spark-1.3.0-bin-hadoop2.4/sbin/start-all.sh # Master and worker are up in localhost:8080/4040 - /home/user/Spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --master spark://localhost:7077 Script.py * The script runs ok but in local :( i can check it in localhost:4040, but i don't see any job in cluster UI The only warning it's: WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.132 instead (on interface eth0) I set SPARK_LOCAL_IP=127.0.0.1 to solve this, al least de warning disappear, but the script keep executing in local not in cluster. I think it has something to do with my virtual server: - Host Server: Linux Mint - The Virtual Server (workstation 10) where runs Spark is Linux Mint as well. Any ideas what am i doing wrong? Thanks in advance for any suggestion, i getting mad on it!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-submiting-an-script-py-against-an-standalone-cluster-tp24091.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Heatmap with Spark Streaming
I do suggest that the non-spark related discussions be taken to a different this forum as it does not directly contribute to the contents of this user list. On Thu, Jul 30, 2015 at 8:52 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Thanks for the valuable suggestion. I also started with JAX-RS restful service with Angular. Since play can help a lot in my scenario, I would prefer it along with Angular. Is this combination fine over d3?
Re: Re: How RDD lineage works
Thanks TD and Zhihong for the guide. I will check it bit1...@163.com From: Tathagata Das Date: 2015-07-31 12:27 To: Ted Yu CC: bit1...@163.com; user Subject: Re: How RDD lineage works You have to read the original Spark paper to understand how RDD lineage works. https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! bit1...@163.com
Re: Re: How RDD lineage works
The following is copied from the paper, is something related with rdd lineage. Is there a unit test that covers this scenario(rdd partition lost and recovery)? Thanks. If a partition of an RDD is lost, the RDD has enough information about how it was derived from other RDDs to recompute just that partition. Thus, lost data can be recovered, often quite quickly, without requiring costly replication. bit1...@163.com From: bit1...@163.com Date: 2015-07-31 13:11 To: Tathagata Das; yuzhihong CC: user Subject: Re: Re: How RDD lineage works Thanks TD and Zhihong for the guide. I will check it bit1...@163.com From: Tathagata Das Date: 2015-07-31 12:27 To: Ted Yu CC: bit1...@163.com; user Subject: Re: How RDD lineage works You have to read the original Spark paper to understand how RDD lineage works. https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! bit1...@163.com
Re: Spark Streaming Kafka could not find leader offset for Set()
I have run into similar excpetions ERROR DirectKafkaInputDStream: ArrayBuffer(java.net.SocketTimeoutException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([AdServe,1])) and the issue has happened on Kafka Side, where my broker offsets go out of sync, or do not return leader for this particular partition ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic AdServe --broker-list BROKER_IP --time -1 this shall return u valid offsets for all your kafka partitons On Thu, Jul 30, 2015 at 7:58 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Cody sorry my bad you were right there was a typo in topicSet. When I corrected typo in topicSet it started working. Thanks a lot. Regards On Thu, Jul 30, 2015 at 7:43 PM, Cody Koeninger c...@koeninger.org wrote: Can you post the code including the values of kafkaParams and topicSet, ideally the relevant output of kafka-topics.sh --describe as well On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks for the response. Like I already mentioned in the question kafka topic is valid and it has data I can see data in it using another kafka consumer. On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote: The last time someone brought this up on the mailing list, the issue actually was that the topic(s) didn't exist in Kafka at the time the spark job was running. On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: There is a known issue that Kafka cannot return leader if there is not data in the topic. I think it was raised in another thread in this forum. Is that the issue? On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I have Spark Streaming code which streams from Kafka topic it used to work fine but suddenly it started throwing the following exception Exception in thread main org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set() at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) My Spark Streaming client code is very simple I just create one receiver using the following code and trying to print messages it consumed JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet); Kafka param is only one I specify kafka.ofset.reset=largest. Kafka topic has data I can see data using other Kafka consumers but above Spark Streaming code throws exception saying leader offset not found. I tried both smallest and largest offset. I wonder what happened this code used to work earlier. I am using Spark-Streaming 1.3.1 as it was working in this version I tried in 1.4.1 and same exception. Please guide. I am new to Spark thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting the number of slaves
To clarify, that is the number of executors requested by the SparkContext from the cluster manager. On Tue, Jul 28, 2015 at 5:18 PM, amkcom amk...@gmail.com wrote: try sc.getConf.getInt(spark.executor.instances, 1) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p24043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: How RDD lineage works
https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FailureSuite.scala This may help. On Thu, Jul 30, 2015 at 10:42 PM, bit1...@163.com bit1...@163.com wrote: The following is copied from the paper, is something related with rdd lineage. Is there a unit test that covers this scenario(rdd partition lost and recovery)? Thanks. If a partition of an RDD is lost, the RDD has enough information about how it was derived from other RDDs to recompute just that partition. Thus, lost data can be recovered, often quite quickly, without requiring costly replication. -- bit1...@163.com *From:* bit1...@163.com *Date:* 2015-07-31 13:11 *To:* Tathagata Das tathagata.das1...@gmail.com; yuzhihong yuzhih...@gmail.com *CC:* user user@spark.apache.org *Subject:* Re: Re: How RDD lineage works Thanks TD and Zhihong for the guide. I will check it -- bit1...@163.com *From:* Tathagata Das tathagata.das1...@gmail.com *Date:* 2015-07-31 12:27 *To:* Ted Yu yuzhih...@gmail.com *CC:* bit1...@163.com; user user@spark.apache.org *Subject:* Re: How RDD lineage works You have to read the original Spark paper to understand how RDD lineage works. https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at: core/src/test/scala/org/apache/spark/CheckpointSuite.scala Cheers On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote: Hi, I don't get a good understanding how RDD lineage works, so I would ask whether spark provides a unit test in the code base to illustrate how RDD lineage works. If there is, What's the class name is it? Thanks! -- bit1...@163.com
Re: Spark SQL DataFrame: Nullable column and filtering
Dear Michael, dear all, motivation: object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } Now I want to perform an *left outer join* on records and mappings (with the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) shorthand is in *leftOuterJoinWithRemovalOfEqualColumn* val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x) joinedDF.filter(joinedDF(y).isNotNull).show Currently, the output is +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ instead of +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ The last output can be achieved by the method of changing nullable=false to nullable=true described in my first post. *Thus, I need this schema modification as to make outer joins work.* Cheers and thanks, Martin 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com: We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Master Build Git Commit Hash
Hi Ted, The problem is that I don't know if the build uses the commits happened on the same day or it is possible that it builds based on Jul 15th commits. Just a thought, it might be possible to replace SNAPSHOT with the git commit hash in the filename so people will know which one is based on. Thank you for your help! Jerry On Thu, Jul 30, 2015 at 11:10 AM, Ted Yu yuzhih...@gmail.com wrote: The files were dated 16-Jul-2015 Looks like nightly build either was not published, or published at a different location. You can download spark-1.5.0-SNAPSHOT.tgz and binary-search for the commits made on Jul 16th. There may be other ways of determining the latest commit. Cheers On Thu, Jul 30, 2015 at 7:39 AM, Jerry Lam chiling...@gmail.com wrote: Hi Spark users and developers, I wonder which git commit was used to build the latest master-nightly build found at: http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/ ? I downloaded the build but I couldn't find the information related to it. Thank you! Best Regards, Jerry
Re: Spark SQL Error
It seem an issue with the ES connector https://github.com/elastic/elasticsearch-hadoop/issues/482 Thanks Best Regards On Tue, Jul 28, 2015 at 6:14 AM, An Tran tra...@gmail.com wrote: Hello all, I am currently having an error with Spark SQL access Elasticsearch using Elasticsearch Spark integration. Below is the series of command I issued along with the stacktrace. I am unclear what the error could mean. I can print the schema correctly but error out if i try and display a few results. Can you guys point me in the right direction? scala sqlContext.read.format(org.elasticsearch.spark.sql).options(esOptions).load(reddit_comment_public-201507-v3/default).registerTempTable(reddit_comment) scala reddit_comment_df.printSchema root |-- data: struct (nullable = true) ||-- archived: boolean (nullable = true) ||-- author: string (nullable = true) ||-- author_flair_css_class: string (nullable = true) ||-- author_flair_text: string (nullable = true) ||-- body: string (nullable = true) ||-- body_html: string (nullable = true) ||-- controversiality: long (nullable = true) ||-- created: long (nullable = true) ||-- created_utc: long (nullable = true) ||-- distinguished: string (nullable = true) ||-- downs: long (nullable = true) ||-- edited: long (nullable = true) ||-- gilded: long (nullable = true) ||-- id: string (nullable = true) ||-- link_author: string (nullable = true) ||-- link_id: string (nullable = true) ||-- link_title: string (nullable = true) ||-- link_url: string (nullable = true) ||-- name: string (nullable = true) ||-- parent_id: string (nullable = true) ||-- replies: string (nullable = true) ||-- saved: boolean (nullable = true) ||-- score: long (nullable = true) ||-- score_hidden: boolean (nullable = true) ||-- subreddit: string (nullable = true) ||-- subreddit_id: string (nullable = true) ||-- ups: long (nullable = true) scala reddit_comment_df.show 15/07/27 20:38:31 INFO ScalaEsRowRDD: Reading from [reddit_comment_public-201507-v3/default] 15/07/27 20:38:31 INFO ScalaEsRowRDD: Discovered mapping {reddit_comment_public-201507-v3=[mappings=[default=[acquire_date=DATE, elasticsearch_date_partition_index=STRING, elasticsearch_language_partition_index=STRING, elasticsearch_type=STRING, source=[data=[archived=BOOLEAN, author=STRING, author_flair_css_class=STRING, author_flair_text=STRING, body=STRING, body_html=STRING, controversiality=LONG, created=LONG, created_utc=LONG, distinguished=STRING, downs=LONG, edited=LONG, gilded=LONG, id=STRING, link_author=STRING, link_id=STRING, link_title=STRING, link_url=STRING, name=STRING, parent_id=STRING, replies=STRING, saved=BOOLEAN, score=LONG, score_hidden=BOOLEAN, subreddit=STRING, subreddit_id=STRING, ups=LONG], kind=STRING], source_geo_location=GEO_POINT, source_id=STRING, source_language=STRING, source_time=DATE]]]} for [reddit_comment_public-201507-v3/default] 15/07/27 20:38:31 INFO SparkContext: Starting job: show at console:26 15/07/27 20:38:31 INFO DAGScheduler: Got job 13 (show at console:26) with 1 output partitions (allowLocal=false) 15/07/27 20:38:31 INFO DAGScheduler: Final stage: ResultStage 16(show at console:26) 15/07/27 20:38:31 INFO DAGScheduler: Parents of final stage: List() 15/07/27 20:38:31 INFO DAGScheduler: Missing parents: List() 15/07/27 20:38:31 INFO DAGScheduler: Submitting ResultStage 16 (MapPartitionsRDD[65] at show at console:26), which has no missing parents 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(7520) called with curMem=71364, maxMem=2778778828 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 7.3 KB, free 2.6 GB) 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(3804) called with curMem=78884, maxMem=2778778828 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 3.7 KB, free 2.6 GB) 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 172.25.185.239:58296 (size: 3.7 KB, free: 2.6 GB) 15/07/27 20:38:31 INFO SparkContext: Created broadcast 13 from broadcast at DAGScheduler.scala:874 15/07/27 20:38:31 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 16 (MapPartitionsRDD[65] at show at console:26) 15/07/27 20:38:31 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks 15/07/27 20:38:31 INFO FairSchedulableBuilder: Added task set TaskSet_16 tasks to pool default 15/07/27 20:38:31 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 172, 172.25.185.164, ANY, 5085 bytes) 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 172.25.185.164:50275 (size: 3.7 KB, free: 3.6 GB) 15/07/27 20:38:31 WARN TaskSetManager: Lost task 0.0 in stage 16.0 (TID 172, 172.25.185.164):
Re: streaming issue
What operation are you doing with streaming? Also can you look in the datanode logs and see whats going on? Thanks Best Regards On Tue, Jul 28, 2015 at 8:18 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi, I got a error when running spark streaming as below . java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener.onUnpersistRDD(EventLoggingListener.scala:175) at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:50) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) Caused by: java.io.IOException: All datanodes 10.153.192.159:50010 are bad. Aborting... at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1137) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:933) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:487) 15/07/28 02:01:10 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) I had set the ulimit in /etc/security/limits.conf , but still get the same exception . can please some body help me to resolved this issue ? core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 264192 max locked memory (kbytes, -l) 32 max memory size (kbytes, -m) unlimited open files (-n) 65535 pipe size(512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 10240 cpu time (seconds, -t) unlimited max user processes (-u) 34816 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited Thanks .
Re: Spark and Speech Recognition
Like this? val data = sc.textFile(/sigmoid/audio/data/, 24).foreachPartition(urls = speachRecognizer(urls)) Let 24 be the total number of cores that you have on all the workers. Thanks Best Regards On Wed, Jul 29, 2015 at 6:50 AM, Peter Wolf opus...@gmail.com wrote: Hello, I am writing a Spark application to use speech recognition to transcribe a very large number of recordings. I need some help configuring Spark. My app is basically a transformation with no side effects: recording URL -- transcript. The input is a huge file with one URL per line, and the output is a huge file of transcripts. The speech recognizer is written in Java (Sphinx4), so it can be packaged as a JAR. The recognizer is very processor intensive, so you can't run too many on one machine-- perhaps one recognizer per core. The recognizer is also big-- maybe 1 GB. But, most of the recognizer is a immutable acoustic and language models that can be shared with other instances of the recognizer. So I want to run about one recognizer per core of each machine in my cluster. I want all recognizer on one machine to run within the same JVM and share the same models. How does one configure Spark for this sort of application? How does one control how Spark deploys the stages of the process. Can someone point me to an appropriate doc or keywords I should Google. Thanks Peter
Re: Spark on YARN
I can't see the application logs here. All the logs are going into stderr. can anybody help here? On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.com wrote: I am running below command this is default spark PI program but this is not running all the log are going in stderr but at the terminal job is succeeding .I guess there are con issue job it not at all launching /bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10 Complete log SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1438090734187_0010_01 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1 15/07/30 12:13:33 WARN spark.SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/30 12:13:33 INFO Remoting: Starting remoting 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.21.1.77:53411] 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 53411. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/blockmgr-2166bbd9-b1ed-41d1-bc95-92c6a7fbd36f 15/07/30 12:13:34 INFO storage.MemoryStore: MemoryStore started with capacity 246.0 MB 15/07/30 12:13:34 INFO spark.HttpFileServer: HTTP File server directory is /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c 15/07/30 12:13:34 INFO spark.HttpServer: Starting HTTP Server 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/30 12:13:34 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52507 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'HTTP file server' on port 52507. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/07/30 12:13:34 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/30 12:13:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:59596 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'SparkUI' on port 59596. 15/07/30 12:13:34 INFO ui.SparkUI: Started SparkUI at http://10.21.1.77:59596 15/07/30 12:13:34 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/07/30
Re: Heatmap with Spark Streaming
Thanks For the suggestion Akhil! I looked at https://github.com/mbostock/d3/wiki/Gallery to know more about d3, all examples described here are on static data, how we can update our heat map from updated data, if we store it in Hbase or Mysql. I mean, do we need to query back and fourth for it. Is there any pluggable and more quick component for heatmap with spark streaming? On Thu, Jul 30, 2015 at 1:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily push data to an intermediate storage from spark streaming (like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3 js. Thanks Best Regards On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: I have just started using Spark Streaming and done few POCs. It is fairly easy to implement. I was thinking of presenting the data using some smart graphing dashboarding tools e.g. Graphite or Grafna, but they don't have heat-maps. I also looked at Zeppelin http://zeppelin-project.org/ , but unable to found any heat-map functionality. Could you please suggest any data visualization tools using Heat-map and Spark streaming.
Re: Writing streaming data to cassandra creates duplicates
Hi All, Can someone throw insights on this ? On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi TD, Thanks for the info. I have the scenario like this. I am reading the data from kafka topic. Let's say kafka has 3 partitions for the topic. In my streaming application, I would configure 3 receivers with 1 thread each such that they would receive 3 dstreams (from 3 partitions of kafka topic) and also I implement partitioner. Now there is a possibility of receiving messages with same primary key twice or more, one is at the time message is created and other times if there is an update to any fields for same message. If two messages M1 and M2 with same primary key are read by 2 receivers then even the partitioner in spark would still end up in parallel processing as there are altogether in different dstreams. How do we address in this situation ? Thanks, Padma Ch On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com wrote: You have to partition that data on the Spark Streaming by the primary key, and then make sure insert data into Cassandra atomically per key, or per set of keys in the partition. You can use the combination of the (batch time, and partition Id) of the RDD inside foreachRDD as the unique id for the data you are inserting. This will guard against multiple attempts to run the task that inserts into Cassandra. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations TD On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I have a problem when writing streaming data to cassandra. Or existing product is on Oracle DB in which while wrtiting data, locks are maintained such that duplicates in the DB are avoided. But as spark has parallel processing architecture, if more than 1 thread is trying to write same data i.e with same primary key, is there as any scope to created duplicates? If yes, how to address this problem either from spark or from cassandra side ? Thanks, Padma Ch
Re: Spark build/sbt assembly
Did you try removing this jar? build/sbt-launch-0.13.7.jar Thanks Best Regards On Tue, Jul 28, 2015 at 12:08 AM, Rahul Palamuttam rahulpala...@gmail.com wrote: Hi All, I hope this is the right place to post troubleshooting questions. I've been following the install instructions and I get the following error when running the following from Spark home directory $./build/sbt Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar However when I run sbt assembly it compiles, with a couple of warnings, but it works none-the less. Is the build/sbt script deprecated? I do notice on one node it works but on the other it gives me the above error. Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: help plz! how to use zipWithIndex to each subset of a RDD
zipWithIndex gives you global indices, which is not what you want. You'll want to use flatMap with a map function that iterates through each iterable and returns the (String, Int, String) tuple for each element. On Thu, Jul 30, 2015 at 4:13 AM, askformore [via Apache Spark User List] ml-node+s1001560n24071...@n3.nabble.com wrote: I have some data like this: RDD[(String, String)] = ((*key-1*, a), ( *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want to group the data by Key, and for each group, add index fields to the groupmember, at last I can transform the data to below : RDD[(String, *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2, *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then I got a RDD[(String, Iterable[String])], but I don't know how to use zipWithIndex function to each Iterable... thanks. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cm9rcm9za2FyQGdtYWlsLmNvbXwxfC0xNDM4OTI3NjU3 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071p24074.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Heatmap with Spark Streaming
You can easily push data to an intermediate storage from spark streaming (like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3 js. Thanks Best Regards On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: I have just started using Spark Streaming and done few POCs. It is fairly easy to implement. I was thinking of presenting the data using some smart graphing dashboarding tools e.g. Graphite or Grafna, but they don't have heat-maps. I also looked at Zeppelin http://zeppelin-project.org/ , but unable to found any heat-map functionality. Could you please suggest any data visualization tools using Heat-map and Spark streaming.
Re: sc.parallelize(512k items) doesn't always use 64 executors
sc.parallelize takes a second parameter which is the total number of partitions, are you using that? Thanks Best Regards On Wed, Jul 29, 2015 at 9:27 PM, Kostas Kougios kostas.koug...@googlemail.com wrote: Hi, I do an sc.parallelize with a list of 512k items. But sometimes not all executors are used, i.e. they don't have work to do and nothing is logged after: 15/07/29 16:35:22 WARN internal.ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? 15/07/29 16:35:22 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56477. 15/07/29 16:35:22 INFO netty.NettyBlockTransferService: Server created on 56477 15/07/29 16:35:22 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/07/29 16:35:22 INFO storage.BlockManagerMaster: Registered BlockManager Any ideas why so? My last run has 3 of the 64 executors not used. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelize-512k-items-doesn-t-always-use-64-executors-tp24062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN
15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM AM is killed somehow, may due to preemption. Does it always happen ? Resource manager log would be helpful. On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele gangele...@gmail.com wrote: I can't see the application logs here. All the logs are going into stderr. can anybody help here? On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.com wrote: I am running below command this is default spark PI program but this is not running all the log are going in stderr but at the terminal job is succeeding .I guess there are con issue job it not at all launching /bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10 Complete log SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1438090734187_0010_01 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1 15/07/30 12:13:33 WARN spark.SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/30 12:13:33 INFO Remoting: Starting remoting 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.21.1.77:53411] 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 53411. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/blockmgr-2166bbd9-b1ed-41d1-bc95-92c6a7fbd36f 15/07/30 12:13:34 INFO storage.MemoryStore: MemoryStore started with capacity 246.0 MB 15/07/30 12:13:34 INFO spark.HttpFileServer: HTTP File server directory is /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c 15/07/30 12:13:34 INFO spark.HttpServer: Starting HTTP Server 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/30 12:13:34 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52507 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'HTTP file server' on port 52507. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/07/30 12:13:34 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/30 12:13:34 INFO server.AbstractConnector: Started
Re: Problems with JobScheduler
The difference is that one recives more data than the others two. I can pass thought parameters the topics, so, I could execute the code trying with one topic and figure out with one is the topic, although I guess that it's the topics which gets more data. Anyway it's pretty weird those delays in just one of the cluster even if the another one is not running. I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I haven't set any value for this parameter, how does it work if this parameter doesn't have a value? 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org: If the jobs are running on different topicpartitions, what's different about them? Is one of them 120x the throughput of the other, for instance? You should be able to eliminate cluster config as a difference by running the same topic partition on the different clusters and comparing the results. On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed, from pool 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at MetricsSpark.scala:67) finished in 60.379 s 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job 143825821 ms.0 from job set of time 143825821 ms 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time 143825821 ms (execution: 60.399 s) 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job 1438258215000 ms.0 from job set of time 1438258215000 ms There are *always *a minute of delay in the third task, when I have executed same code in
How to control Spark Executors from getting Lost when using YARN client mode?
Hi I have one Spark job which runs fine locally with less data but when I schedule it on YARN to execute I keep on getting the following ERROR and slowly all executors gets removed from UI and my job fails 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on myhost1.com: remote Rpc client disassociated 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on myhost2.com: remote Rpc client disassociated I use the following command to schedule spark job in yarn-client mode ./spark-submit --class com.xyz.MySpark --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 12 /home/myuser/myspark-1.0.jar I dont know what is the problem please guide. I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sc.parallelise to work more like a producer/consumer?
there is a work around. sc.parallelise(items, items size / 2) This way each executor will get a batch of 2 items at a time, simulating a producer-consumer. With /4 it will get 4 items. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelise-to-work-more-like-a-producer-consumer-tp24032p24085.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL DataFrame: Nullable column and filtering
Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL DataFrame: Nullable column and filtering
We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TFIDF Transformation
Hello spark users, I hope your week is going fantastic! I am having some troubles with the TFIDF in MLlib and was wondering if anyone can point me to the right direction. The data ingestion and the initial term frequency count code taken from the example works fine (I am using the first example from this page: https://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html). Below is my input data: WrappedArray((Frank, spent, Friday, afternoon, at, labs, test, test, test, test, test, test, test, test, test)) WrappedArray((we, are, testing, the, algorithm, with, us, test, test, test, test, test, test, test, test)) WrappedArray((hello, my, name, is, Hans, and, I, am, testing, TFIDF, test, test, test, test, test)) WrappedArray((TFIDF, is, an, amazing, algorithm, that, is, used, for, spam, filtering, and, search, test, test)) WrappedArray((Accenture, is, doing, great, test, test, test, test, test, test, test, test, test, test, test)) Here's the output: (1048576,[1065,1463,33868,34122,34252,337086,420523,603314,717226,767673,839152,876983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0]) (1048576,[1463,6313,33869,34122,118216,147517,162737,367946,583529,603314,605639,646109,876983,972879],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) (1048576,[20311,34122,340246,603314,778861,876983],[1.0,1.0,1.0,10.0,1.0,1.0]) (1048576,[33875,102986,154015,267598,360614,603314,690972,876983],[1.0,1.0,1.0,1.0,1.0,8.0,1.0,1.0]) (1048576,[1588,19537,34494,42230,603314,696550,839152,876983,972879],[1.0,1.0,1.0,1.0,7.0,1.0,1.0,1.0,1.0]) The problem I am having here is that the output from HashingTF is not ordered like the original sentence, I understand that the integer 603314 in the output stands for the word test in the input. But how would I programmatically translate the number back to the word so I know which words are most common? Please let me know your thoughts! I am not sure how helpful these are going to be but here are the things I've noticed when I was looking into the source code of TFIDF: 1. def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures) This line of code hashes the term into it's ASCII value and calculates 'ASCII' modulo 'numberFeatures'(which is defaulted 2^20) 2. Then def transform(document: Iterable[_]): Vector = { blah blah blah} --- This part of the code does the counting and spreads the current array into two separate ones using Vectors.sparse. Thanks in advance and I hope to hear from you soon! Best, Hans This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Spark on YARN
I am running below command this is default spark PI program but this is not running all the log are going in stderr but at the terminal job is succeeding .I guess there are con issue job it not at all launching /bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10 Complete log SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1438090734187_0010_01 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1 15/07/30 12:13:33 WARN spark.SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/30 12:13:33 INFO Remoting: Starting remoting 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.21.1.77:53411] 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 53411. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/blockmgr-2166bbd9-b1ed-41d1-bc95-92c6a7fbd36f 15/07/30 12:13:34 INFO storage.MemoryStore: MemoryStore started with capacity 246.0 MB 15/07/30 12:13:34 INFO spark.HttpFileServer: HTTP File server directory is /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c 15/07/30 12:13:34 INFO spark.HttpServer: Starting HTTP Server 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/30 12:13:34 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52507 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'HTTP file server' on port 52507. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/07/30 12:13:34 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/07/30 12:13:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:59596 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'SparkUI' on port 59596. 15/07/30 12:13:34 INFO ui.SparkUI: Started SparkUI at http://10.21.1.77:59596 15/07/30 12:13:34 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43354. 15/07/30 12:13:34 INFO netty.NettyBlockTransferService: Server created on 43354 15/07/30 12:13:34 INFO storage.BlockManagerMaster: Trying to register
Is it Spark Serialization bug ?
Hi all, I have tried to use lambda expression in spark task, and it throws java.lang.IllegalArgumentException: Invalid lambda deserialization exception. This exception is thrown when the is code like transform(pRDD-pRDD.map(t-t._2)) . The code snippet is below. JavaPairDStreamString,Integer aggregate = pairRDD.reduceByKey((x,y)-x+y);JavaDStreamInteger con = aggregate.transform((FunctionJavaPairRDDString,Integer, JavaRDDInteger)pRDD- pRDD.map( (FunctionTuple2String,Integer,Integer)t-t._2)); JavaPairDStreamString,Integer aggregate = pairRDD.reduceByKey((x,y)-x+y);JavaDStreamInteger con = aggregate.transform((FunctionJavaPairRDDString,Integer, JavaRDDInteger Serializable)pRDD- pRDD.map( (FunctionTuple2String,Integer,Integer Serializable)t-t._2)); The above two options didn't worked. Where as if I pass below object f as the argument instead of lambda expressiont-t_.2. It works. Function f = new FunctionTuple2String,Integer,Integer(){@Overridepublic Integer call(Tuple2String,Integer paramT1) throws Exception {return paramT1._2;}}; May I know what is the right format to express that functions as a lambda expression. public static void main(String[] args) { Function f = new FunctionTuple2String,Integer,Integer(){ @Override public Integer call(Tuple2String,Integer paramT1) throws Exception { return paramT1._2; } }; JavaStreamingContext ssc = JavaStreamingFactory.getInstance(); JavaReceiverInputDStreamString lines = ssc.socketTextStream(localhost, ); JavaDStreamString words = lines.flatMap(s-{return Arrays.asList(s.split( ));}); JavaPairDStreamString,Integer pairRDD = words.mapToPair(x-new Tuple2String,Integer(x,1)); JavaPairDStreamString,Integer aggregate = pairRDD.reduceByKey((x,y)-x+y); JavaDStreamInteger con = aggregate.transform( (FunctionJavaPairRDDString,Integer, JavaRDDInteger)pRDD- pRDD.map( (FunctionTuple2String,Integer,Integer)t-t._2)); //JavaDStreamInteger con = aggregate.transform(pRDD- pRDD.map(f)); It works con.print(); ssc.start(); ssc.awaitTermination(); }
Re: Graceful shutdown for Spark Streaming
How is sleep not working? Are you doing streamingContext.start() Thread.sleep(xxx) streamingContext.stop() On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com wrote: If we want to stop the application after fix-time period , how it will work . (How to give the duration in logic , in my case sleep(t.s.) is not working .) So i used to kill coarseGrained job at each slave by script .Please suggest something . On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com wrote: StreamingContext.stop(stopGracefully = true) stops the streaming context gracefully. Then you can safely terminate the Spark cluster. They are two different steps and needs to be done separately ensuring that the driver process has been completely terminated before the Spark cluster is the terminated. On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com wrote: How to initiate graceful shutdown from outside of the Spark Streaming driver process? Both for the local and cluster mode of Spark Standalone as well as EMR. Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there a signal sent to the driver process? For EMR, is there a way how to terminate an EMR cluster with Spark Streaming graceful shutdown? Thanks! -- Thanks Regards, Anshu Shukla
Re: How to control Spark Executors from getting Lost when using YARN client mode?
What is your cluster configuration ( size and resources) ? If you do not have enough resources, then your executor will not run. Moreover allocating 8 cores to an executor is too much. If you have a cluster with four nodes running NodeManagers, each equipped with 4 cores and 8GB of memory, then an optimal configuration would be, --num-executors 8 --executor-cores 2 --executor-memory 2G Thanks, Ashwin On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote: Hi I have one Spark job which runs fine locally with less data but when I schedule it on YARN to execute I keep on getting the following ERROR and slowly all executors gets removed from UI and my job fails 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on myhost1.com: remote Rpc client disassociated 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on myhost2.com: remote Rpc client disassociated I use the following command to schedule spark job in yarn-client mode ./spark-submit --class com.xyz.MySpark --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 12 /home/myuser/myspark-1.0.jar I dont know what is the problem please guide. I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thanks Regards, Ashwin Giridharan
Re: sc.parallelize(512k items) doesn't always use 64 executors
yes,thanks, that sorted out the issue. On 30/07/15 09:26, Akhil Das wrote: sc.parallelize takes a second parameter which is the total number of partitions, are you using that? Thanks Best Regards On Wed, Jul 29, 2015 at 9:27 PM, Kostas Kougios kostas.koug...@googlemail.com mailto:kostas.koug...@googlemail.com wrote: Hi, I do an sc.parallelize with a list of 512k items. But sometimes not all executors are used, i.e. they don't have work to do and nothing is logged after: 15/07/29 16:35:22 WARN internal.ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? 15/07/29 16:35:22 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56477. 15/07/29 16:35:22 INFO netty.NettyBlockTransferService: Server created on 56477 15/07/29 16:35:22 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/07/29 16:35:22 INFO storage.BlockManagerMaster: Registered BlockManager Any ideas why so? My last run has 3 of the 64 executors not used. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelize-512k-items-doesn-t-always-use-64-executors-tp24062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Difference between RandomForestModel and RandomForestClassificationModel
Hi Praveen, In MLLib, the major difference is that RandomForestClassificationModel makes use of a newer API which utilizes ML pipelines. I can't say for certain if they will produce the same exact result for a given dataset, but I believe they should. Bryan On Wed, Jul 29, 2015 at 12:14 PM, praveen S mylogi...@gmail.com wrote: Hi Wanted to know what is the difference between RandomForestModel and RandomForestClassificationModel? in Mlib.. Will they yield the same results for a given dataset?
Re: HiveQL to SparkSQL
Thanks Jorn for the response and for the pointer questions to Hive optimization tips. I believe I have done the possible applicable things to improve hive query performance including but not limited to - running on TEZ, using partitioning, bucketing, using explain to make sure partition pruning is happening, using compression, using the best data types for join columns, denormalizing etc:. I am using Hive version - 0.13. The idea behind this POC is to find the strengths of SparkSQL over HiveQL and identify the use cases where SparkSQL can perform better than HiveQL other than the iterative use cases. In general, what would be the SparkSQL use scenarios? I am pretty sure someone have tried this before and compared performance...Any responses would be much appreciated. Thank you. On Wed, Jul 29, 2015 at 1:57 PM, Jörn Franke jornfra...@gmail.com wrote: What Hive Version are you using? Do you run it in on TEZ? Are you using the ORC Format? Do you use compression? Snappy? Do you use Bloom filters? Do you insert the data sorted on the right columns? Do you use partitioning? Did you increase the replication factor for often used tables or partitions? Do you use bucketing? Is your data model appropriate (join columns as int , use numeric data types where appropriate , dates as int...), dif you calculate statistics? Did you use indexes (compressed, ORC Format?) do you provide mapjoin hints? Did you do any other Hive optimization? Did you use explain to verify that only selected partitions, indexes, Bloom filters had been used? Did you verify that no other application has taken resources? What is the CPU level on namenode, hiveserver2? If it is high then you need Mord memory there! First rule is to get it Hive right before you think about in-memory. Caching will only help for iterative stuff. You may think about denormalizing the model even more to avoid joins as much as possible. Bigdata techguy bigdatatech...@gmail.com schrieb am Mi., 29.07.2015, 18:49: Hi All, I have a fairly complex HiveQL data processing which I am trying to convert to SparkSQL to improve performance. Below is what it does. Select around 100 columns including Aggregates From a FACT_TABLE Joined to the summary of the same FACT_TABLE Joined to 2 smaller DIMENSION tables. The data processing currently takes around an hour to complete processing. This is what I have tried so far. 1. Use hiveContext to query the DIMENSION tables, store it as DataFrames and registerTempTable. 2. Use hiveContext to query the summary of FACT_TABLE, store it as DataFrames and registerTempTable. 3. Use the Temp tables from above 2 steps to get the final RecordSet to another DataFrame. 4. Save the DataFrame from step 3 to Hive with InsertOverwrite using saveAsTable. Below are my questions. Any response would be much appreciated. Thanks. A. Is there a better approach? B. Does breaking down the big Hive query into multiple steps with multiple DataFrames expected to give better performance? C. Is there an opportunity to intermix RDD with SparkSQL in this case? D. Can the Caching of a DataFrame improve performance? E. Are there other suggestions to improve performance? Thank You for your time.
TFIDF Transformation
Hello spark users! I am having some troubles with the TFIDF in MLlib and was wondering if anyone can point me to the right direction. The data ingestion and the initial term frequency count code taken from the example works fine (I am using the first example from this page: https://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html). Below is my input data: WrappedArray((Frank, spent, Friday, afternoon, at, labs, test, test, test, test, test, test, test, test, test)) WrappedArray((we, are, testing, the, algorithm, with, us, test, test, test, test, test, test, test, test)) WrappedArray((hello, my, name, is, Hans, and, I, am, testing, TFIDF, test, test, test, test, test)) WrappedArray((TFIDF, is, an, amazing, algorithm, that, is, used, for, spam, filtering, and, search, test, test)) WrappedArray((Accenture, is, doing, great, test, test, test, test, test, test, test, test, test, test, test)) Here’s the output: (1048576,[1065,1463,33868,34122,34252,337086,420523,603314,717226,767673,839152,876983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0]) (1048576,[1463,6313,33869,34122,118216,147517,162737,367946,583529,603314,605639,646109,876983,972879],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) (1048576,[20311,34122,340246,603314,778861,876983],[1.0,1.0,1.0,10.0,1.0,1.0]) (1048576,[33875,102986,154015,267598,360614,603314,690972,876983],[1.0,1.0,1.0,1.0,1.0,8.0,1.0,1.0]) (1048576,[1588,19537,34494,42230,603314,696550,839152,876983,972879],[1.0,1.0,1.0,1.0,7.0,1.0,1.0,1.0,1.0]) The problem I am having here is that the output from HashingTF is not ordered like the original sentence, I understand that the integer “603314” in the output stands for the word “ test” in the input. But how would I programmatically translate the number back to the word so I know which words are most common? Please let me know your thoughts! I am not sure how helpful these are going to be but here are the things I’ve noticed when I was looking into the source code of TFIDF: 1. def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures) This line of code hashes the term into it’s ASCII value and calculates ‘ASCII’ modulo ‘numberFeatures’(which is defaulted 2^20) 2. Then def transform(document: Iterable[_]): Vector = { blah blah blah} ——— This part of the code does the counting and spreads the current array into two separate ones using Vectors.sparse. Thanks in advance and I hope to hear from you soon! Best, Hans -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TFIDF-Transformation-tp24086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: apache-spark 1.3.0 and yarn integration and spring-boot as a container
you need to fix your configuration so that the resource manager hostname/URL is set...that address there is the listen on any port path On 30 Jul 2015, at 10:47, Nirav Patel npa...@xactlycorp.commailto:npa...@xactlycorp.com wrote: 15/07/29 11:19:26 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032http://0.0.0.0:8032/ Future versions of Hadoop will give you more meaningful diagnostics messages here ( HADOOP-9657https://issues.apache.org/jira/browse/HADOOP-9657 ), but the fix is the same: get your RM hostname right. -steve
[Parquet + Dataframes] Column names with spaces
Hi all, Our data has lots of human readable column names (names that include spaces), is it possible to use these with Parquet and Dataframes? When I try and write the Dataframe I get the following error: (I am using PySpark) `AnalysisException: Attribute name Name with Space contains invalid character(s) among ,;{}()\n\t=. Please use alias to rename it.` How can I alias that column name? `df['Name with Space'] = df['Name with Space'].alias('Name')` doesn't work as you can't assign to a dataframe column. `df.withColumnRenamed('Name with Space', 'Name')` overwrites the column and doesn't alias it. Any ideas? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-Dataframes-Column-names-with-spaces-tp24088.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cast Error DataFrame/RDD doing group by and case class
Hi, I have just started learning DF in sparks and encountered the following error: I am creating the following : *case class A(a1:String,a2:String,a3:String)* *case class B(b1:String,b2:String,b3:String)* *case class C(key:A,value:Seq[B])* Now I have to do a DF with struc (key :{..},value:{..} i.e *case class C(key:A,value:B)*) I want to do a group by on this DF which results in (key:List{value1,value2,..}) and return DF after the operation. I am implementing the following as: 1. *val x = DF1.map(r= (r(0),r(1) )).groupByKey* the data in x comes as expected 2.*val y = x.map{case (k,v) = ( C(k.asInstanceOf[A],Seq(v.toSeq.asInstanceOf[B])))}* so now when I am doing *y.toDF.show* I am getting the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 12, localhost): *java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $iwC$$iwC$A* Thanks in advance. Regards, Rishabh.
Re: Spark and Speech Recognition
You might also want to consider broadcasting the models to ensure you get one instance shared across cores in each machine, otherwise the model will be serialised to each task and you'll get a copy per executor (roughly core in this instance) Simon Sent from my iPhone On 30 Jul 2015, at 10:14, Akhil Das ak...@sigmoidanalytics.com wrote: Like this? val data = sc.textFile(/sigmoid/audio/data/, 24).foreachPartition(urls = speachRecognizer(urls)) Let 24 be the total number of cores that you have on all the workers. Thanks Best Regards On Wed, Jul 29, 2015 at 6:50 AM, Peter Wolf opus...@gmail.com wrote: Hello, I am writing a Spark application to use speech recognition to transcribe a very large number of recordings. I need some help configuring Spark. My app is basically a transformation with no side effects: recording URL -- transcript. The input is a huge file with one URL per line, and the output is a huge file of transcripts. The speech recognizer is written in Java (Sphinx4), so it can be packaged as a JAR. The recognizer is very processor intensive, so you can't run too many on one machine-- perhaps one recognizer per core. The recognizer is also big-- maybe 1 GB. But, most of the recognizer is a immutable acoustic and language models that can be shared with other instances of the recognizer. So I want to run about one recognizer per core of each machine in my cluster. I want all recognizer on one machine to run within the same JVM and share the same models. How does one configure Spark for this sort of application? How does one control how Spark deploys the stages of the process. Can someone point me to an appropriate doc or keywords I should Google. Thanks Peter
Twitter Connector-Spark Streaming
Hi. I am writing twitter connector using spark streaming. but it fetched the random tweets. Is there any way to receive the tweets of a particular account? I made an app on twitter and used the credentials as given below. def managingCredentials(): Option[twitter4j.auth.Authorization]= { object auth{ val config = new twitter4j.conf.ConfigurationBuilder() .setOAuthConsumerKey() .setOAuthConsumerSecret() .setOAuthAccessToken() .setOAuthAccessTokenSecret() .build } val twitter_auth = new TwitterFactory(auth.config) val a = new twitter4j.auth.OAuthAuthorization(auth.config) val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization()) atwitter } Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Connector-Spark-Streaming-tp24078.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error SparkStreaming after a while executing.
I'm executing a job with Spark Streaming and got this error all times when the job has been executing for a while (usually hours of days). I have no idea why it's happening. 15/07/30 13:02:14 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:169) at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) Caused by: java.io.IOException: Lease timeout of 0 seconds expired. at org.apache.hadoop.hdfs.DFSOutputStream.abort(DFSOutputStream.java:2192) at org.apache.hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java:935) at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:889) at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417) at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442) at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71) at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298) at java.lang.Thread.run(Thread.java:745) 15/07/30 13:02:14 INFO SparkContext: Starting job: foreachRDD at MetricsSpark.scala:67 15/07/30 13:02:14 INFO DAGScheduler: Got job 5050 (foreachRDD at MetricsSpark.scala:67) with 3 output partitions (allowLocal=false) 15/07/30 13:02:14 INFO DAGScheduler: Final stage: Stage 5050(foreachRDD at MetricsSpark.scala:67) Sometimes this error happens, but it doesn't mean that Spark stops working forever. Because it looks like after this error Spark works correctly some iterations, but most of time just fails after this error producing this error all the time. The code just filters some records and index the record in ElasticSearch.after adding some new fields.
RE: Spark on YARN
You’d better also check the log of nodemanager, sometimes because your memory usage exceeds the limit of Yarn container’s configuration. I’ve met similar problem before, here is the warning log in nodemanager: 2015-07-07 17:06:07,141 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=17385,containerID=container_1436259427993_0001_02_01] is running beyond virtual memory limits. Current usage: 318.1 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container. The default pmem-vmem ratio is 2.1, but seems executor requires more vmem when started, so nodemanager will kill it. If you met similar problem, you could increase this configuration “yarn.nodemanager.vmem-pmem-ratio”. Thanks Jerry From: Jeff Zhang [mailto:zjf...@gmail.com] Sent: Thursday, July 30, 2015 4:36 PM To: Jeetendra Gangele Cc: user Subject: Re: Spark on YARN 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM AM is killed somehow, may due to preemption. Does it always happen ? Resource manager log would be helpful. On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com wrote: I can't see the application logs here. All the logs are going into stderr. can anybody help here? On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com wrote: I am running below command this is default spark PI program but this is not running all the log are going in stderr but at the terminal job is succeeding .I guess there are con issue job it not at all launching /bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10 Complete log SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1438090734187_0010_01 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1 15/07/30 12:13:33 WARN spark.SparkConf: SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a work-around. 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/07/30 12:13:33 INFO Remoting: Starting remoting 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.21.1.77:53411http://sparkDriver@10.21.1.77:53411] 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' on port 53411. 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at
Re: Writing streaming data to cassandra creates duplicates
Hi, Just my two cents. I understand your problem is that your problem is that you have messages with the same key in two different dstreams. What I would do would be making a union of all the dstreams with StreamingContext.union or several calls to DStream.union, and then I would create a pair dstream with the primary key as key, and then I'd use groupByKey or reduceByKey (or combineByKey etc) to combine the messages with the same primary key. Hope that helps. Greetings, Juan 2015-07-30 10:50 GMT+02:00 Priya Ch learnings.chitt...@gmail.com: Hi All, Can someone throw insights on this ? On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi TD, Thanks for the info. I have the scenario like this. I am reading the data from kafka topic. Let's say kafka has 3 partitions for the topic. In my streaming application, I would configure 3 receivers with 1 thread each such that they would receive 3 dstreams (from 3 partitions of kafka topic) and also I implement partitioner. Now there is a possibility of receiving messages with same primary key twice or more, one is at the time message is created and other times if there is an update to any fields for same message. If two messages M1 and M2 with same primary key are read by 2 receivers then even the partitioner in spark would still end up in parallel processing as there are altogether in different dstreams. How do we address in this situation ? Thanks, Padma Ch On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com wrote: You have to partition that data on the Spark Streaming by the primary key, and then make sure insert data into Cassandra atomically per key, or per set of keys in the partition. You can use the combination of the (batch time, and partition Id) of the RDD inside foreachRDD as the unique id for the data you are inserting. This will guard against multiple attempts to run the task that inserts into Cassandra. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations TD On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I have a problem when writing streaming data to cassandra. Or existing product is on Oracle DB in which while wrtiting data, locks are maintained such that duplicates in the DB are avoided. But as spark has parallel processing architecture, if more than 1 thread is trying to write same data i.e with same primary key, is there as any scope to created duplicates? If yes, how to address this problem either from spark or from cassandra side ? Thanks, Padma Ch
Re: How to set log level in spark-submit ?
I saw such example in docs: --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file://$path_to_file but, unfortunately, it does not work for me. On 30.07.2015 05:12, canan chen wrote: Yes, that should work. What I mean is is there any option in spark-submit command that I can specify for the log level On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com mailto:jcove...@gmail.com wrote: Put a log4j.properties file in conf/. You can copy log4j.properties.template as a good base El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com mailto:ccn...@gmail.com escribió: Anyone know how to set log level in spark-submit ? Thanks
Re: Heatmap with Spark Streaming
You can integrate it with any language (like php) and use ajax calls to update the charts. Thanks Best Regards On Thu, Jul 30, 2015 at 2:11 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Thanks For the suggestion Akhil! I looked at https://github.com/mbostock/d3/wiki/Gallery to know more about d3, all examples described here are on static data, how we can update our heat map from updated data, if we store it in Hbase or Mysql. I mean, do we need to query back and fourth for it. Is there any pluggable and more quick component for heatmap with spark streaming? On Thu, Jul 30, 2015 at 1:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can easily push data to an intermediate storage from spark streaming (like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3 js. Thanks Best Regards On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: I have just started using Spark Streaming and done few POCs. It is fairly easy to implement. I was thinking of presenting the data using some smart graphing dashboarding tools e.g. Graphite or Grafna, but they don't have heat-maps. I also looked at Zeppelin http://zeppelin-project.org/ , but unable to found any heat-map functionality. Could you please suggest any data visualization tools using Heat-map and Spark streaming.
Upgrade of Spark-Streaming application
Hi, I've read about the recent updates about spark-streaming integration with Kafka (I refer to the new approach without receivers). In the new approach, metadata are persisted in checkpoint folders on HDFS so that the SparkStreaming context can be recreated in case of failures. This means that the streaming application will restart from the where it exited and the message consuming process continues with new messages only. Also, if I manually stop the streaming process and recreate the context from checkpoint (using an approach similar to https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala), the behavior would be the same. Now, suppose I want to change something in the software and modify the processing pipeline. Can spark use the previous checkpoint to recreate the new application? Will I ever be able to upgrade the software without processing all the messages in Kafka again? Regards, Nicola
How to perform basic statistics on a Json file to explore my numeric and non-numeric variables?
I've imported a Json file which has this schema : sqlContext.read.json(filename).printSchema root |-- COL: long (nullable = true) |-- DATA: array (nullable = true) ||-- element: struct (containsNull = true) |||-- Crate: string (nullable = true) |||-- MLrate: string (nullable = true) |||-- Nrout: string (nullable = true) |||-- up: string (nullable = true) |-- IFAM: string (nullable = true) |-- KTM: long (nullable = true) I'm new on Spark and I want to perform basic statistics like * getting the min, max, mean, median and std of numeric variables * getting the values frequencies for non-numeric variables. My questions are : - How to change the type of my variables in my schema, from 'string' to 'numeric' ? (Crate, MLrate and Nrout should be numeric variables) ? - How to do those basic statistics easily ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-perform-basic-statistics-on-a-Json-file-to-explore-my-numeric-and-non-numeric-variables-tp24077.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org