Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
I have some problem with the JobScheduler. I have executed same code in two
cluster. I read from three topics in Kafka with DirectStream so I have
three tasks.

I have check YARN and there aren't more jobs launched.

The cluster where I have troubles I got this logs:

15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID
72, x, RACK_LOCAL, 14856 bytes)
15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID
73, xxx, RACK_LOCAL, 14852 bytes)
15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID
74, x, RACK_LOCAL, 14860 bytes)
15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID
72) in 208 ms on x (1/3)
15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID
74) in 49 ms on x (2/3)
*15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms*
*15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms*
*15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms*
*15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms*
*15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms*
*15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms*
*15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms*
*15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms*
*15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms*
*15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms*
*15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms*
*15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms*
15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID
73) in 60373 ms on (3/3)
15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
have all completed, from pool
15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
MetricsSpark.scala:67) finished in 60.379 s
15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
MetricsSpark.scala:67, took 60.391761 s
15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
143825821 ms.0 from job set of time 143825821 ms
15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
143825821 ms (execution: 60.399 s)
15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
1438258215000 ms.0 from job set of time 1438258215000 ms

There are *always *a minute of delay in the third task, when I have
executed same code in another cluster there isn't this delay in the
JobScheduler. I checked the configuration in YARN in both clusters and it
seems the same.

The log in the cluster is working good is

15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID
279, xx, RACK_LOCAL, 14643 bytes)
15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID
280, x, RACK_LOCAL, 14639 bytes)
15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID
281, xxx, RACK_LOCAL, 14647 bytes)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID
279) in 121 ms on  (1/3)
15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID
281) in 261 ms on xx (2/3)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0 (TID
280) in 519 ms on x (3/3)
15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
MetricsSpark.scala:67) finished in 0.522 s
15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
have all completed, from pool
15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
MetricsSpark.scala:67, took 0.531323 s
15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
1438259855000 ms.0 from job set of time 1438259855000 ms
15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
1438259855000 ms (execution: 0.540 s)
15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list

Any clue about where I could take a look? Number of cpus in YARN is enough.
I executing YARN with same options (--master yarn-server with 1g of memory
in both)


Re: Spark and Speech Recognition

2015-07-30 Thread Peter Wolf
Oh...  That was embarrassingly easy!

Thank you that was exactly the understanding of partitions that I needed.

P

On Thu, Jul 30, 2015 at 6:35 AM, Simon Elliston Ball 
si...@simonellistonball.com wrote:

 You might also want to consider broadcasting the models to ensure you get
 one instance shared across cores in each machine, otherwise the model will
 be serialised to each task and you'll get a copy per executor (roughly core
 in this instance)

 Simon

 Sent from my iPhone

 On 30 Jul 2015, at 10:14, Akhil Das ak...@sigmoidanalytics.com wrote:

 Like this?

 val data = sc.textFile(/sigmoid/audio/data/, 24).foreachPartition(urls
 = speachRecognizer(urls))

 Let 24 be the total number of cores that you have on all the workers.

 Thanks
 Best Regards

 On Wed, Jul 29, 2015 at 6:50 AM, Peter Wolf opus...@gmail.com wrote:

 Hello, I am writing a Spark application to use speech recognition to
 transcribe a very large number of recordings.

 I need some help configuring Spark.

 My app is basically a transformation with no side effects: recording URL
 -- transcript.  The input is a huge file with one URL per line, and the
 output is a huge file of transcripts.

 The speech recognizer is written in Java (Sphinx4), so it can be packaged
 as a JAR.

 The recognizer is very processor intensive, so you can't run too many on
 one machine-- perhaps one recognizer per core.  The recognizer is also
 big-- maybe 1 GB.  But, most of the recognizer is a immutable acoustic and
 language models that can be shared with other instances of the recognizer.

 So I want to run about one recognizer per core of each machine in my
 cluster.  I want all recognizer on one machine to run within the same JVM
 and share the same models.

 How does one configure Spark for this sort of application?  How does one
 control how Spark deploys the stages of the process.  Can someone point me
 to an appropriate doc or keywords I should Google.

 Thanks
 Peter





Re: Running Spark on user-provided Hadoop installation

2015-07-30 Thread Ted Yu
Herman:
For Pre-built with user-provided Hadoop, spark-1.4.1-bin-hadoop2.6.tgz,
e.g., uses hadoop-2.6 profile which defines versions of projects Spark
depends on.

Hadoop cluster is used to provide storage (hdfs) and resource management
(YARN).
For the latter, please see:
https://spark.apache.org/docs/latest/running-on-yarn.html

Cheers

On Thu, Jul 30, 2015 at 1:48 AM, hermansc herman.schis...@gmail.com wrote:

 Hi.

 I want to run Spark, and more specifically the Pre-build with
 user-provided
 Hadoop version from the downloads page, but I can't find any documentation
 on how to connect the two components together (namely Spark and Hadoop).

 I've had some success in settting SPARK_CLASSPATH to my hadoop distribution
 lib/ directory, containing jar files such as hadoop-core, hadoop-common
 etc.

 However, there seems to be many native libraries included in the assembly
 jar for Spark versions pre-built for Hadoop distributions (I'm specifically
 missing the libsnappy.so files) that are not by default included in
 distributions such as Cloudera Hadoop.

 Have anyone here actually tried to run Spark without Hadoop included in the
 assembly jar and/or have any more resources where I can read about the
 proper way of connecting them?

 As an aside, the spark-assembly jar in the Spark version pre-built for
 user-provided Hadoop distributions is named
 spark-assembly-1.4.0-hadoop2.2.0.jar, which doesn't make sense - it should
 be called spark-assembly-1.4.0-without-hadoop.jar :)

 --
 Herman



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-user-provided-Hadoop-installation-tp24076.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
I read about maxRatePerPartition parameter, I haven't set this parameter.
Could it be the problem?? Although this wouldn't explain why it doesn't
work in one of the clusters.

2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I tried
 to stop one cluster and execute just the cluster isn't working but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958
 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959
 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960
 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961
 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962
 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963
 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
 (TID 281) in 261 ms on xx (2/3)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
 (TID 280) in 519 ms on x (3/3)
 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
 MetricsSpark.scala:67) finished in 0.522 s
 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
 have all completed, from pool
 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s
 

Re: Spark on YARN

2015-07-30 Thread Jeetendra Gangele
Thanks for information this fixed the issue. Issue was in spark-master
memory when I specify manually 1G for master. it start working

On 30 July 2015 at 14:26, Shao, Saisai saisai.s...@intel.com wrote:

  You’d better also check the log of nodemanager, sometimes because your
 memory usage exceeds the limit of Yarn container’s configuration.



 I’ve met similar problem before, here is the warning log in nodemanager:



 2015-07-07 17:06:07,141 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=17385,containerID=container_1436259427993_0001_02_01] is
 running beyond virtual memory limits. Current usage: 318.1 MB of 1 GB
 physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing
 container.



 The default pmem-vmem ratio is 2.1, but seems executor requires more vmem
 when started, so nodemanager will kill it. If you met similar problem, you
 could increase this configuration “yarn.nodemanager.vmem-pmem-ratio”.



 Thanks

 Jerry



 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Thursday, July 30, 2015 4:36 PM
 *To:* Jeetendra Gangele
 *Cc:* user
 *Subject:* Re: Spark on YARN



  15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15:
 SIGTERM



 AM is killed somehow, may due to preemption. Does it always happen ?
 Resource manager log would be helpful.







 On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

  I can't see the application logs here. All the logs are going into
 stderr. can anybody help here?



 On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.com wrote:

  I am running below command this is default spark PI program but this is
 not running all the log are going in stderr but at the terminal job is
 succeeding .I guess there are con issue job it not at all launching



 /bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10





 Complete log



 SLF4J: Class path contains multiple SLF4J bindings.

 SLF4J: Found binding in 
 [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

 SLF4J: Found binding in 
 [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
 explanation.

 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for 
 [TERM, HUP, INT]

 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
 appattempt_1438090734187_0010_01

 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop

 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop

 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
 with modify permissions: Set(hadoop)

 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application 
 in a separate Thread

 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
 initialization

 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
 initialization ...

 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1

 15/07/30 12:13:33 WARN spark.SparkConf:

 SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411').

 This is deprecated in Spark 1.0+.



 Please instead use:

  - ./spark-submit with conf/spark-defaults.conf to set defaults for an 
 application

  - ./spark-submit with --driver-java-options to set -X options for a driver

  - spark.executor.extraJavaOptions to set -X options for executors

  - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master 
 or worker)



 15/07/30 12:13:33 WARN spark.SparkConf: Setting 
 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
 work-around.

 15/07/30 12:13:33 WARN spark.SparkConf: Setting 
 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
 work-around.

 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop

 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop

 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
 with modify permissions: Set(hadoop)

 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started

 15/07/30 12:13:33 INFO Remoting: Starting remoting

 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkDriver@10.21.1.77:53411]

 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' 
 on port 53411.

 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker

 

Re: Graceful shutdown for Spark Streaming

2015-07-30 Thread anshu shukla
Yes I  was  doing same  , if  You mean that this is the correct way to do
 Then I will verify it  once more in my case .

On Thu, Jul 30, 2015 at 1:02 PM, Tathagata Das t...@databricks.com wrote:

 How is sleep not working? Are you doing

 streamingContext.start()
 Thread.sleep(xxx)
 streamingContext.stop()

 On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 If we want to stop the  application after fix-time period , how it will
 work . (How to give the duration in logic , in my case  sleep(t.s.)  is not
 working .)  So i used to kill coarseGrained job at each slave by script
 .Please suggest something .

 On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com
 wrote:

 StreamingContext.stop(stopGracefully = true) stops the streaming context
 gracefully.
 Then you can safely terminate the Spark cluster. They are two different
 steps and needs to be done separately ensuring that the driver process has
 been completely terminated before the Spark cluster is the terminated.

 On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com
 wrote:

 How to initiate graceful shutdown from outside of the Spark Streaming
 driver process? Both for the local and cluster mode of Spark Standalone as
 well as EMR.

 Does sbin/stop-all.sh stop the context gracefully? How is it done? Is
 there a signal sent to the driver process?

 For EMR, is there a way how to terminate an EMR cluster with Spark
 Streaming graceful shutdown?

 Thanks!






 --
 Thanks  Regards,
 Anshu Shukla





-- 
Thanks  Regards,
Anshu Shukla


Re: How to set log level in spark-submit ?

2015-07-30 Thread Dean Wampler
Did you use an absolute path in $path_to_file? I just tried this with
spark-shell v1.4.1 and it worked for me. If the URL is wrong, you should
see an error message from log4j that it can't find the file. For windows it
would be something like file:/c:/path/to/file, I believe.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Jul 30, 2015 at 4:41 AM, Alexander Krasheninnikov 
a.krasheninni...@corp.badoo.com wrote:

  I saw such example in docs:
 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=
 file://$path_to_file
 but, unfortunately, it does not work for me.


 On 30.07.2015 05:12, canan chen wrote:

 Yes, that should work. What I mean is is there any option in spark-submit
 command that I can specify for the log level


 On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com
 wrote:

 Put a log4j.properties file in conf/. You can copy
 log4j.properties.template as a good base


 El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com
 escribió:

 Anyone know how to set log level in spark-submit ? Thanks






Python version collision

2015-07-30 Thread Javier Domingo Cansino
Hi,

I find rather confusing the documentation about the configuration options.
There are a lot of files that are not too clear on where to modify. For
example, spark-env vs spark-defaults.

I am getting an error with Python versions collision:

  File /root/spark/python/lib/pyspark.zip/pyspark/worker.py, line 64, in
main
(%d.%d % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver
3.4, PySpark cannot run with different minor versions

at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


But I have conf/spark-env.sh with:

PYSPARK_PYTHON=python3.4

Also, I am not sure about the shebang line there is in the top of the
spark-env because sourcing it would make the env vars be defined in a
subrpocess, so I removed that, but anyway, I am having the same problem,

Anyone has experience using python3? And with python3 in virtualenv?

Also, as a matter of feedback, I find rather difficult to deploy and
develop apps because although you may have ipython notebook, I haven't
found a way to include pyspark in my environment (with the rest of the
virtualenv libraries).

Thanks!


Re: Spark Interview Questions

2015-07-30 Thread Sandeep Giri
i have prepared some interview questions:
http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-1
http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

please provide your feedback.

On Wed, Jul 29, 2015, 23:43 Pedro Rodriguez ski.rodrig...@gmail.com wrote:

 You might look at the edx course on Apache Spark or ML with Spark. There
 are probably some homework problems or quiz questions that might be
 relevant. I haven't looked at the course myself, but thats where I would go
 first.


 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x
 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x

 --
 Pedro Rodriguez
 PhD Student in Distributed Machine Learning | CU Boulder
 UC Berkeley AMPLab Alumni

 ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
 Github: github.com/EntilZha | LinkedIn:
 https://www.linkedin.com/in/pedrorodriguezscience




Re: help plz! how to use zipWithIndex to each subset of a RDD

2015-07-30 Thread askformore
Hi @rok, thanks I got it 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071p24080.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
They just share the kafka, the rest of resources are independents. I tried
to stop one cluster and execute just the cluster isn't working but it
happens the same.

2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
 (TID 281) in 261 ms on xx (2/3)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
 (TID 280) in 519 ms on x (3/3)
 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
 MetricsSpark.scala:67) finished in 0.522 s
 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
 have all completed, from pool
 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s
 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
 1438259855000 ms.0 from job set of time 1438259855000 ms
 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
 1438259855000 ms (execution: 0.540 s)
 15/07/30 14:37:35 INFO 

Re: Twitter Connector-Spark Streaming

2015-07-30 Thread Akhil Das
You can create a custom receiver and then inside it you can write yourown
piece of code to receive data, filter them etc before giving it to spark.

Thanks
Best Regards

On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan sa...@platalytics.com wrote:

 okay :)

 then is there anyway to fetch the tweets specific to my account?

 Thanks in anticipation :)

 On Thu, Jul 30, 2015 at 6:17 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Owh, this one fetches the public tweets, not the one specific to your
 account.

 Thanks
 Best Regards

 On Thu, Jul 30, 2015 at 6:11 PM, Sadaf Khan sa...@platalytics.com
 wrote:

 yes. but can you please tell me how to mention a specific user account
 in filter?
 I want to fetch my tweets, tweets of my followers and the tweets of
 those  whom i followed.
 So in short i want to fatch the tweets of my account only.

 Recently i have used
val tweets
 =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)


 Any response will be very much appreciated. :)

 Thanks.


 On Thu, Jul 30, 2015 at 5:20 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 TwitterUtils.createStream takes a 3rd argument which is a filter, once
 you provide these, it will only fetch tweets of such.

 Thanks
 Best Regards

 On Thu, Jul 30, 2015 at 4:19 PM, Sadaf sa...@platalytics.com wrote:

 Hi.
 I am writing twitter connector using spark streaming. but it fetched
 the
 random tweets.
 Is there any way to receive the tweets of a particular account?

 I made an app on twitter and used the credentials as given below.

  def managingCredentials(): Option[twitter4j.auth.Authorization]=
   {
   object auth{
   val config = new twitter4j.conf.ConfigurationBuilder()
 .setOAuthConsumerKey()
 .setOAuthConsumerSecret()
 .setOAuthAccessToken()
 .setOAuthAccessTokenSecret()
 .build
 }
 val twitter_auth = new TwitterFactory(auth.config)
 val a = new twitter4j.auth.OAuthAuthorization(auth.config)
 val atwitter : Option[twitter4j.auth.Authorization] =
 Some(twitter_auth.getInstance(a).getAuthorization())
  atwitter
  }

 Thanks :)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Connector-Spark-Streaming-tp24078.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Connection closed/reset by peers error

2015-07-30 Thread firemonk9
I am having the same issue. Have you found any resolution ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connection-closed-reset-by-peers-error-tp21459p24081.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with JobScheduler

2015-07-30 Thread Cody Koeninger
Just so I'm clear, the difference in timing you're talking about is this:

15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
MetricsSpark.scala:67, took 60.391761 s

15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
MetricsSpark.scala:67, took 0.531323 s


Are those jobs running on the same topicpartition?


On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 I read about maxRatePerPartition parameter, I haven't set this parameter.
 Could it be the problem?? Although this wouldn't explain why it doesn't
 work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958
 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959
 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960
 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961
 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962
 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963
 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
 (TID 281) in 261 ms on xx (2/3)
 

Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
I have three topics with one partition each topic. So each jobs run about
one topics.

2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code in
 two cluster. I read from three topics in Kafka with DirectStream so I have
 three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958
 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959
 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960
 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961
 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962
 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963
 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
 have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 (size: 1801.0 B, free: 530.3 MB)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
 (TID 281, xxx, RACK_LOCAL, 14647 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
 (TID 279) in 121 ms on  (1/3)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:49886 

Spark Master Build Git Commit Hash

2015-07-30 Thread Jerry Lam
Hi Spark users and developers,

I wonder which git commit was used to build the latest master-nightly build
found at:
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/?
I downloaded the build but I couldn't find the information related to it.
Thank you!

Best Regards,

Jerry


Re: Lost task - connection closed

2015-07-30 Thread firemonk9
I am getting same error. Any resolution on this issue ?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361p24082.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade of Spark-Streaming application

2015-07-30 Thread Cody Koeninger
You can't use checkpoints across code upgrades.  That may or may not change
in the future, but for now that's a limitation of spark checkpoints
(regardless of whether you're using Kafka).

Some options:

- Start up the new job on a different cluster, then kill the old job once
it's caught up to where the new job started.  If you care about duplicate
work, you should be doing idempotent / transactional writes anyway, which
should take care of the overlap between the two.  If you're doing batches,
you may need to be a little more careful about handling batch boundaries

- Store the offsets somewhere other than the checkpoint, and provide them
on startup using the fromOffsets argument to createDirectStream





On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro nibbi...@gmail.com wrote:

 Hi,
 I've read about the recent updates about spark-streaming integration with
 Kafka (I refer to the new approach without receivers).
 In the new approach, metadata are persisted in checkpoint folders on HDFS
 so that the SparkStreaming context can be recreated in case of failures.
 This means that the streaming application will restart from the where it
 exited and the message consuming process continues with new messages only.
 Also, if I manually stop the streaming process and recreate the context
 from checkpoint (using an approach similar to
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
 the behavior would be the same.

 Now, suppose I want to change something in the software and modify the
 processing pipeline.
 Can spark use the previous checkpoint to recreate the new application?
 Will I ever be able to upgrade the software without processing all the
 messages in Kafka again?

 Regards,
 Nicola



Re: Problems with JobScheduler

2015-07-30 Thread Cody Koeninger
If the jobs are running on different topicpartitions, what's different
about them?  Is one of them 120x the throughput of the other, for
instance?  You should be able to eliminate cluster config as a difference
by running the same topic partition on the different clusters and comparing
the results.

On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 I have three topics with one partition each topic. So each jobs run about
 one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code
 in two cluster. I read from three topics in Kafka with DirectStream so I
 have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
 tasks have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in another cluster there isn't this delay in the
 JobScheduler. I checked the configuration in YARN in both clusters and it
 seems the same.

 The log in the cluster is working good is

 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
 (TID 279, xx, RACK_LOCAL, 14643 bytes)
 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
 (TID 280, x, RACK_LOCAL, 14639 bytes)
 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
 memory on x:45132 

Re: Spark build/sbt assembly

2015-07-30 Thread Rahul Palamuttam
Hi Akhil,

Yes I did try to remove it, and i tried to build again.
However that jar keeps getting recreated, whenever i run ./build/sbt
assembly

Thanks,

Rahul P

On Thu, Jul 30, 2015 at 12:38 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try removing this jar? build/sbt-launch-0.13.7.jar

 Thanks
 Best Regards

 On Tue, Jul 28, 2015 at 12:08 AM, Rahul Palamuttam rahulpala...@gmail.com
  wrote:

 Hi All,

 I hope this is the right place to post troubleshooting questions.
 I've been following the install instructions and I get the following error
 when running the following from Spark home directory

 $./build/sbt
 Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME.
 Note, this will be overridden by -java-home if it is set.
 Attempting to fetch sbt
 Launching sbt from build/sbt-launch-0.13.7.jar
 Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar

 However when I run sbt assembly it compiles, with a couple of warnings,
 but
 it works none-the less.
 Is the build/sbt script deprecated? I do notice on one node it works but
 on
 the other it gives me the above error.

 Thanks,

 Rahul P



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-30 Thread Umesh Kacha
Hi Cody sorry my bad you were right there was a typo in topicSet. When I
corrected typo in topicSet it started working. Thanks a lot.

Regards

On Thu, Jul 30, 2015 at 7:43 PM, Cody Koeninger c...@koeninger.org wrote:

 Can you post the code including the values of kafkaParams and topicSet,
 ideally the relevant output of kafka-topics.sh --describe as well

 On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks for the response. Like I already mentioned in the question
 kafka topic is valid and it has data I can see data in it using another
 kafka consumer.
 On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote:

 The last time someone brought this up on the mailing list, the issue
 actually was that the topic(s) didn't exist in Kafka at the time the spark
 job was running.





 On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 There is a known issue that Kafka cannot return leader if there is not
 data in the topic. I think it was raised in another thread in this forum.
 Is that the issue?

 On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 Hi I have Spark Streaming code which streams from Kafka topic it used
 to work
 fine but suddenly it started throwing the following exception

 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.SparkException: Couldn't find leader offsets for Set()
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at scala.util.Either.fold(Either.scala:97)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528)
 at

 org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
 My Spark Streaming client code is very simple I just create one
 receiver
 using the following code and trying to print messages it consumed

 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicSet);

 Kafka param is only one I specify kafka.ofset.reset=largest. Kafka
 topic has
 data I can see data using other Kafka consumers but above Spark
 Streaming
 code throws exception saying leader offset not found. I tried both
 smallest
 and largest offset. I wonder what happened this code used to work
 earlier. I
 am using Spark-Streaming 1.3.1 as it was working in this version I
 tried in
 1.4.1 and same exception. Please guide. I am new to Spark thanks in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-30 Thread Cody Koeninger
Can you post the code including the values of kafkaParams and topicSet,
ideally the relevant output of kafka-topics.sh --describe as well

On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi thanks for the response. Like I already mentioned in the question kafka
 topic is valid and it has data I can see data in it using another kafka
 consumer.
 On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote:

 The last time someone brought this up on the mailing list, the issue
 actually was that the topic(s) didn't exist in Kafka at the time the spark
 job was running.





 On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 There is a known issue that Kafka cannot return leader if there is not
 data in the topic. I think it was raised in another thread in this forum.
 Is that the issue?

 On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have Spark Streaming code which streams from Kafka topic it used
 to work
 fine but suddenly it started throwing the following exception

 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.SparkException: Couldn't find leader offsets for Set()
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at scala.util.Either.fold(Either.scala:97)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528)
 at

 org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
 My Spark Streaming client code is very simple I just create one receiver
 using the following code and trying to print messages it consumed

 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicSet);

 Kafka param is only one I specify kafka.ofset.reset=largest. Kafka
 topic has
 data I can see data using other Kafka consumers but above Spark
 Streaming
 code throws exception saying leader offset not found. I tried both
 smallest
 and largest offset. I wonder what happened this code used to work
 earlier. I
 am using Spark-Streaming 1.3.1 as it was working in this version I
 tried in
 1.4.1 and same exception. Please guide. I am new to Spark thanks in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






How to register array class with Kyro in spark-defaults.conf

2015-07-30 Thread Wang, Ningjun (LNG-NPV)
I register my class with Kyro in spark-defaults.conf as follow

spark.serializer   
org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired true
spark.kryo.classesToRegister  ltn.analytics.es.EsDoc

But I got the following exception

java.lang.IllegalArgumentException: Class is not registered: 
ltn.analytics.es.EsDoc[]
Note: To register this class use: kryo.register(ltn.analytics.es.EsDoc[].class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

The error message seems to suggest that I should also register the array class 
EsDoc[]. So I add it to spark-defaults.conf as follow

spark.kryo.classesToRegister  ltn.analytics.es.EsDoc,ltn.analytics.es.EsDoc[]

Then I got the following error

org.apache.spark.SparkException: Failed to register classes with Kryo
at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101)
at 
org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:153)
at 
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115)
at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at 
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at ltn.analytics.index.Index.addDocuments(Index.scala:82)
Please advise.

Thanks.
Ningjun


Failed to load class for data source: org.apache.spark.sql.cassandra

2015-07-30 Thread Benjamin Ross
Hey all,
I'm running what should be a very straight-forward application of the Cassandra 
sql connector, and I'm getting an error:

Exception in thread main java.lang.RuntimeException: Failed to load class for 
data source: org.apache.spark.sql.cassandra
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at com.latticeengines.test.CassandraTest$.main(CassandraTest.scala:33)
at com.latticeengines.test.CassandraTest.main(CassandraTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/30 15:34:47 INFO spark.SparkContext: Invoking stop() from shutdown hook

My jar is shaded, so I assume this shouldn't happen?

Here's the code I'm trying to run:
object CassandraTest {
  def main(args: Array[String]) {
println(Hello, scala!)

var conf = new SparkConf(true).set(spark.cassandra.connection.host, 
127.0.0.1)


val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext
  .read
  .format(org.apache.spark.sql.cassandra)
  .options(Map( table - kv, keyspace - test))
  .load()
val w = Window.orderBy(value).rowsBetween(-2, 0)
df.select(mean(value).over(w))

  }
}



RE: Failed to load class for data source: org.apache.spark.sql.cassandra

2015-07-30 Thread Benjamin Ross
I'm submitting the application this way:
spark-submit  test-2.0.5-SNAPSHOT-jar-with-dependencies.jar

I've confirmed that org.apache.spark.sql.cassandra and org.apache.cassandra 
classes are in the jar.

Apologies for this relatively newbie question - I'm still new to both spark and 
scala.
Thanks,
Ben


From: Benjamin Ross
Sent: Thursday, July 30, 2015 3:45 PM
To: user@spark.apache.org
Subject: Failed to load class for data source: org.apache.spark.sql.cassandra

Hey all,
I'm running what should be a very straight-forward application of the Cassandra 
sql connector, and I'm getting an error:

Exception in thread main java.lang.RuntimeException: Failed to load class for 
data source: org.apache.spark.sql.cassandra
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at com.latticeengines.test.CassandraTest$.main(CassandraTest.scala:33)
at com.latticeengines.test.CassandraTest.main(CassandraTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/30 15:34:47 INFO spark.SparkContext: Invoking stop() from shutdown hook

My jar is shaded, so I assume this shouldn't happen?

Here's the code I'm trying to run:
object CassandraTest {
  def main(args: Array[String]) {
println(Hello, scala!)

var conf = new SparkConf(true).set(spark.cassandra.connection.host, 
127.0.0.1)


val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext
  .read
  .format(org.apache.spark.sql.cassandra)
  .options(Map( table - kv, keyspace - test))
  .load()
val w = Window.orderBy(value).rowsBetween(-2, 0)
df.select(mean(value).over(w))

  }
}



Re: Spark Master Build Git Commit Hash

2015-07-30 Thread Ted Yu
Looking at:
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-NightlyBuilds

Maven artifacts should be here:
https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-core_2.10/1.5.0-SNAPSHOT/

though the jars were dated July 16th.

FYI

On Thu, Jul 30, 2015 at 12:11 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Ted,

 The problem is that I don't know if the build uses the commits happened on
 the same day or it is possible that it builds based on Jul 15th commits.
 Just a thought, it might be possible to replace SNAPSHOT with the git
 commit hash in the filename so people will know which one is based on.

 Thank you for your help!

 Jerry

 On Thu, Jul 30, 2015 at 11:10 AM, Ted Yu yuzhih...@gmail.com wrote:

 The files were dated 16-Jul-2015
 Looks like nightly build either was not published, or published at a
 different location.

 You can download spark-1.5.0-SNAPSHOT.tgz and binary-search for the
 commits made on Jul 16th.
 There may be other ways of determining the latest commit.

 Cheers

 On Thu, Jul 30, 2015 at 7:39 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Spark users and developers,

 I wonder which git commit was used to build the latest master-nightly
 build found at:
 http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
 ?
 I downloaded the build but I couldn't find the information related to
 it. Thank you!

 Best Regards,

 Jerry






Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Michael Armbrust
Perhaps I'm missing what you are trying to accomplish, but if you'd like to
avoid the null values do an inner join instead of an outer join.

Additionally, I'm confused about how the result of joinedDF.filter(joinedDF(
y).isNotNull).show still contains null values in the column y. This
doesn't really have anything to do with nullable, which is only a hint to
the system so that we can avoid null checking when we know that there are
no null values. If you provide the full code i can try and see if this is a
bug.

On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com
wrote:

 Dear Michael, dear all,

 motivation:

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 Now I want to perform an *left outer join* on records and mappings (with the 
 ON JOIN criterion on columns (recordDF(x) === mappingDF(x)  shorthand 
 is in *leftOuterJoinWithRemovalOfEqualColumn*

 val sqlContext = new SQLContext(sc)
 // used to implicitly convert an RDD to a DataFrame.
 import sqlContext.implicits._

 val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
 val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

 val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x)

 joinedDF.filter(joinedDF(y).isNotNull).show


 Currently, the output is

 +-+-++

 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 instead of

 +-+---+-+

 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+

 The last output can be achieved by the method of changing nullable=false
 to nullable=true described in my first post.

 *Thus, I need this schema modification as to make outer joins work.*

 Cheers and thanks,

 Martin



 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
 don't actually leverage this information in many places yet.  Why do you
 want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
 martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according
 to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: [Parquet + Dataframes] Column names with spaces

2015-07-30 Thread Michael Armbrust
You can't use these names due to limitations in parquet (and the library it
self with silently generate corrupt files that can't be read, hence the
error we throw).

You can alias a column by df.select(df(old).alias(new)), which is
essential what withColumnRenamed does.  Alias in this case means renaming.

On Thu, Jul 30, 2015 at 11:49 AM, angelini alex.angel...@shopify.com
wrote:

 Hi all,

 Our data has lots of human readable column names (names that include
 spaces), is it possible to use these with Parquet and Dataframes?

 When I try and write the Dataframe I get the following error:

 (I am using PySpark)

 `AnalysisException: Attribute name Name with Space contains invalid
 character(s) among  ,;{}()\n\t=. Please use alias to rename it.`

 How can I alias that column name?

 `df['Name with Space'] = df['Name with Space'].alias('Name')` doesn't work
 as you can't assign to a dataframe column.

 `df.withColumnRenamed('Name with Space', 'Name')` overwrites the column and
 doesn't alias it.

 Any ideas?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-Dataframes-Column-names-with-spaces-tp24088.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: unsubscribe

2015-07-30 Thread Brandon White
https://www.youtube.com/watch?v=JncgoPKklVE

On Thu, Jul 30, 2015 at 1:30 PM, ziqiu...@accenture.com wrote:



 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com



unsubscribe

2015-07-30 Thread ziqiu.li




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Re: Problems with JobScheduler

2015-07-30 Thread Tathagata Das
Yes, and that is indeed the problem. It is trying to process all the data
in Kafka, and therefore taking 60 seconds. You need to set the rate limits
for that.

On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org wrote:

 If you don't set it, there is no maximum rate, it will get everything from
 the end of the last batch to the maximum available offset

 On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 The difference is that one recives more data than the others two. I can
 pass thought parameters the topics, so, I could execute the code trying
 with one topic and figure out with one is the topic, although I guess that
 it's the topics which gets more data.

 Anyway it's pretty weird those delays in just one of the cluster even if
 the another one is not running.
 I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I
 haven't set any value for this parameter, how does it work if this
 parameter doesn't have a value?

 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I have three topics with one partition each topic. So each jobs run
 about one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is
 this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why 
 it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working 
 but it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same
 code in two cluster. I read from three topics in Kafka with 
 DirectStream so
 I have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
 24.0 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
 tasks have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, 

How do i specify the data types in a DF

2015-07-30 Thread afarahat
Hello; 
I have a simple file of mobile IDFA. they look like
gregconv = ['00013FEE-7561-47F3-95BC-CA18D20BCF78',
'000D9B97-2B54-4B80-AAA1-C1CB42CFBF3A',
'000F9E1F-BC7E-47E1-BF68-C68F6D987B96']
I am trying to make this RDD into a data frame

ConvRecord = Row(IDFA)

gregconvdf = gregconv.map(lambda x: ConvRecord(*x)).toDF()
i get the following error

Traceback (most recent call last):
  File stdin, line 1, in module
  File /homes/afarahat/aofspark/share/spark/python/pyspark/sql/context.py,
line 60, in toDF
return sqlContext.createDataFrame(self, schema, sampleRatio)
  File /homes/afarahat/aofspark/share/spark/python/pyspark/sql/context.py,
line 351, in createDataFrame
_verify_type(row, schema)
  File /homes/afarahat/aofspark/share/spark/python/pyspark/sql/types.py,
line 1027, in _verify_type
length of fields (%d) % (len(obj), len(dataType.fields)))
ValueError: Length of object (36) does not match with length of fields (1)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-i-specify-the-data-types-in-a-DF-tp24090.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark Streaming need to list all the files in a directory?

2015-07-30 Thread Brandon White
Is this a known bottle neck for Spark Streaming textFileStream? Does it
need to list all the current files in a directory before he gets the new
files? Say I have 500k files in a directory, does it list them all in order
to get the new files?


Re: Does Spark Streaming need to list all the files in a directory?

2015-07-30 Thread Tathagata Das
For the first time it needs to list them. AFter that the list should be
cached by the file stream implementation (as far as I remember).


On Thu, Jul 30, 2015 at 3:55 PM, Brandon White bwwintheho...@gmail.com
wrote:

 Is this a known bottle neck for Spark Streaming textFileStream? Does it
 need to list all the current files in a directory before he gets the new
 files? Say I have 500k files in a directory, does it list them all in order
 to get the new files?



Parquet SaveMode.Append Trouble.

2015-07-30 Thread satyajit vegesna
Hi,

I am new to using Spark and Parquet files,

Below is what i am trying to do, on Spark-shell,

val df =
sqlContext.parquetFile(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet)
Have also tried below command,

val
df=sqlContext.read.format(parquet).load(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet)

Now i have an other existing parquet file to which i want to append this
Parquet file data of df.

so i use,

df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet,
SaveMode.Append )

also tried below command,

df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,
SaveMode.Append )


and it throws me below error,

console:26: error: not found: value SaveMode

df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet,
SaveMode.Append )

Please help me, in case i am doing something wrong here.

Regards,
Satyajit.


RE: Failed to load class for data source: org.apache.spark.sql.cassandra

2015-07-30 Thread Benjamin Ross
If anyone's curious, the issue here is that I was using the 1.2.4 connector of 
the datastax spark Cassandra connector, rather than the 1.4.0-M1 pre-release.  
1.2.4 doesn't fully support data frames, and it's presumably still only 
experimental in 1.4.0-M1.

Ben


From: Benjamin Ross
Sent: Thursday, July 30, 2015 4:14 PM
To: user@spark.apache.org
Subject: RE: Failed to load class for data source: 
org.apache.spark.sql.cassandra

I'm submitting the application this way:
spark-submit  test-2.0.5-SNAPSHOT-jar-with-dependencies.jar

I've confirmed that org.apache.spark.sql.cassandra and org.apache.cassandra 
classes are in the jar.

Apologies for this relatively newbie question - I'm still new to both spark and 
scala.
Thanks,
Ben


From: Benjamin Ross
Sent: Thursday, July 30, 2015 3:45 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Failed to load class for data source: org.apache.spark.sql.cassandra

Hey all,
I'm running what should be a very straight-forward application of the Cassandra 
sql connector, and I'm getting an error:

Exception in thread main java.lang.RuntimeException: Failed to load class for 
data source: org.apache.spark.sql.cassandra
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at com.latticeengines.test.CassandraTest$.main(CassandraTest.scala:33)
at com.latticeengines.test.CassandraTest.main(CassandraTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/30 15:34:47 INFO spark.SparkContext: Invoking stop() from shutdown hook

My jar is shaded, so I assume this shouldn't happen?

Here's the code I'm trying to run:
object CassandraTest {
  def main(args: Array[String]) {
println(Hello, scala!)

var conf = new SparkConf(true).set(spark.cassandra.connection.host, 
127.0.0.1)


val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext
  .read
  .format(org.apache.spark.sql.cassandra)
  .options(Map( table - kv, keyspace - test))
  .load()
val w = Window.orderBy(value).rowsBetween(-2, 0)
df.select(mean(value).over(w))

  }
}



Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Martin Senne
Dear Michael, dear all,

distinguishing those records that have a match in mapping from those that
don't is the crucial point.

Record(x : Int,  a: String)
Mapping(x: Int, y: Int)

Thus

Record(1, hello)
Record(2, bob)
Mapping(2, 5)

yield (2, bob, 5) on an inner join.
BUT I'm also interested in (1, hello, null) as there is no counterpart in
mapping (this is the left outer join part)

I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
updates (case 2, bon).

Cheers and thanks,

Martin
Am 30.07.2015 22:58 schrieb Michael Armbrust mich...@databricks.com:

 Perhaps I'm missing what you are trying to accomplish, but if you'd like
to avoid the null values do an inner join instead of an outer join.

 Additionally, I'm confused about how the result
of joinedDF.filter(joinedDF(y).isNotNull).show still contains null values
in the column y. This doesn't really have anything to do with nullable,
which is only a hint to the system so that we can avoid null checking when
we know that there are no null values. If you provide the full code i can
try and see if this is a bug.

 On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne 
martin.se...@googlemail.com wrote:

 Dear Michael, dear all,

 motivation:

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 Now I want to perform an left outer join on records and mappings (with
the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) 
shorthand is in leftOuterJoinWithRemovalOfEqualColumn

 val sqlContext = new SQLContext(sc)
 // used to implicitly convert an RDD to a DataFrame.
 import sqlContext.implicits._

 val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
 val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

 val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
mappingDF, x)

 joinedDF.filter(joinedDF(y).isNotNull).show


 Currently, the output is


+-+-++

 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 instead of


+-+---+-+

 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+

 The last output can be achieved by the method of changing nullable=false
to nullable=true described in my first post.

 Thus, I need this schema modification as to make outer joins work.

 Cheers and thanks,

 Martin



 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to
filter all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column
(according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a
non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at
Nabble.com.

 -
 To unsubscribe, e-mail: 

RE: Heatmap with Spark Streaming

2015-07-30 Thread Mohammed Guller
Umesh,
You can create a web-service in any of the languages supported by Spark and 
stream the result from this web-service to your D3-based client using Websocket 
or Server-Sent Events.

For example, you can create a webservice using Play. This app will integrate 
with Spark streaming in the back-end. The front-end can be a D3-based or any 
Javascript app.  Play makes it easy to stream data to a web client. So a client 
does not need to continuously poll data from the back-end server.

Mohammed

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Thursday, July 30, 2015 2:07 AM
To: UMESH CHAUDHARY
Cc: user@spark.apache.org
Subject: Re: Heatmap with Spark Streaming

You can integrate it with any language (like php) and use ajax calls to update 
the charts.

Thanks
Best Regards

On Thu, Jul 30, 2015 at 2:11 PM, UMESH CHAUDHARY 
umesh9...@gmail.commailto:umesh9...@gmail.com wrote:
Thanks For the suggestion Akhil!
I looked at https://github.com/mbostock/d3/wiki/Gallery to know more about d3, 
all examples described here are on static data, how we can update our heat map 
from updated data, if we store it in Hbase or Mysql. I mean, do we need to 
query back and fourth for it.
Is there any pluggable and more quick component for heatmap with spark 
streaming?

On Thu, Jul 30, 2015 at 1:23 PM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
You can easily push data to an intermediate storage from spark streaming (like 
HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3 js.

Thanks
Best Regards

On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY 
umesh9...@gmail.commailto:umesh9...@gmail.com wrote:
I have just started using Spark Streaming and done few POCs. It is fairly easy 
to implement. I was thinking of presenting the data using some smart graphing  
dashboarding tools e.g. Graphite or Grafna, but they don't have heat-maps. I 
also looked at Zeppelinhttp://zeppelin-project.org/ , but unable to found any 
heat-map functionality. Could you please suggest any data visualization tools 
using Heat-map and Spark streaming.







Losing files in hdfs after creating spark sql table

2015-07-30 Thread Ron Gonzalez

Hi,
  After I create a table in spark sql and load infile an hdfs file to 
it, the file is no longer queryable if I do hadoop fs -ls.

  Is this expected?

Thanks,
Ron

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How RDD lineage works

2015-07-30 Thread Tathagata Das
You have to read the original Spark paper to understand how RDD lineage
works.
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote:

 Please take a look at:
 core/src/test/scala/org/apache/spark/CheckpointSuite.scala

 Cheers

 On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,

 I don't get a good understanding how RDD lineage works, so I would ask
 whether spark provides a unit test in the code base to illustrate how RDD
 lineage works.
 If there is, What's the class name is it?
 Thanks!

 --
 bit1...@163.com





How RDD lineage works

2015-07-30 Thread bit1...@163.com
Hi,

I don't get a good understanding how RDD lineage works, so I would ask whether 
spark provides a unit test in the code base to illustrate how RDD lineage works.
If there is, What's the class name is it? 
Thanks!



bit1...@163.com


Re: Problem submiting an script .py against an standalone cluster.

2015-07-30 Thread Marcelo Vanzin
Can you share the part of the code in your script where you create the
SparkContext instance?

On Thu, Jul 30, 2015 at 7:19 PM, fordfarline fordfarl...@gmail.com wrote:

 Hi All,

 I`m having an issue when lanching an app (python) against a stand alone
 cluster, but runs in local, as it doesn't reach the cluster.
 It's the first time i try the cluster, in local works ok.

 i made this:

 - /home/user/Spark/spark-1.3.0-bin-hadoop2.4/sbin/start-all.sh # Master
 and
 worker are up in localhost:8080/4040
 - /home/user/Spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --master
 spark://localhost:7077 Script.py
* The script runs ok but in local :(i can check it in
 localhost:4040, but i don't see any job in cluster UI

 The only warning it's:
 WARN Utils: Your hostname, localhost resolves to a loopback address:
 127.0.0.1; using 192.168.1.132 instead (on interface eth0)

 I set SPARK_LOCAL_IP=127.0.0.1 to solve this, al least de warning
 disappear,
 but the script keep executing in local not in cluster.

 I think it has something to do with my virtual server:
 - Host Server: Linux Mint
 - The Virtual Server (workstation 10) where runs Spark is Linux Mint as
 well.

 Any ideas what am i doing wrong?

 Thanks in advance for any suggestion, i getting mad on it!!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-submiting-an-script-py-against-an-standalone-cluster-tp24091.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo


Re: How RDD lineage works

2015-07-30 Thread Ted Yu
Please take a look at:
core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Cheers

On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,

 I don't get a good understanding how RDD lineage works, so I would ask
 whether spark provides a unit test in the code base to illustrate how RDD
 lineage works.
 If there is, What's the class name is it?
 Thanks!

 --
 bit1...@163.com



Re: Problem submiting an script .py against an standalone cluster.

2015-07-30 Thread Anh Hong
You might want to run spark-submit with option --deploy-mode cluster
 


 On Thursday, July 30, 2015 7:24 PM, Marcelo Vanzin van...@cloudera.com 
wrote:
   

 Can you share the part of the code in your script where you create the 
SparkContext instance?
On Thu, Jul 30, 2015 at 7:19 PM, fordfarline fordfarl...@gmail.com wrote:

Hi All,

I`m having an issue when lanching an app (python) against a stand alone
cluster, but runs in local, as it doesn't reach the cluster.
It's the first time i try the cluster, in local works ok.

i made this:

- /home/user/Spark/spark-1.3.0-bin-hadoop2.4/sbin/start-all.sh # Master and
worker are up in localhost:8080/4040
- /home/user/Spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --master
spark://localhost:7077 Script.py
           * The script runs ok but in local :(    i can check it in
localhost:4040, but i don't see any job in cluster UI

The only warning it's:
WARN Utils: Your hostname, localhost resolves to a loopback address:
127.0.0.1; using 192.168.1.132 instead (on interface eth0)

I set SPARK_LOCAL_IP=127.0.0.1 to solve this, al least de warning disappear,
but the script keep executing in local not in cluster.

I think it has something to do with my virtual server:
- Host Server: Linux Mint
- The Virtual Server (workstation 10) where runs Spark is Linux Mint as
well.

Any ideas what am i doing wrong?

Thanks in advance for any suggestion, i getting mad on it!!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-submiting-an-script-py-against-an-standalone-cluster-tp24091.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-- 
Marcelo


  

Re: Heatmap with Spark Streaming

2015-07-30 Thread Tathagata Das
I do suggest that the non-spark related discussions be taken to a different
this forum as it does not directly contribute to the contents of this user
list.

On Thu, Jul 30, 2015 at 8:52 PM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 Thanks for the valuable suggestion.
 I also started with JAX-RS restful service with Angular.
 Since play can help a lot in my scenario, I would prefer it along with
 Angular. Is this combination fine over d3?



Re: Re: How RDD lineage works

2015-07-30 Thread bit1...@163.com
Thanks TD and Zhihong for the guide. I will check it




bit1...@163.com
 
From: Tathagata Das
Date: 2015-07-31 12:27
To: Ted Yu
CC: bit1...@163.com; user
Subject: Re: How RDD lineage works
You have to read the original Spark paper to understand how RDD lineage works. 
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote:
Please take a look at:
core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Cheers

On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote:
Hi,

I don't get a good understanding how RDD lineage works, so I would ask whether 
spark provides a unit test in the code base to illustrate how RDD lineage works.
If there is, What's the class name is it? 
Thanks!



bit1...@163.com




Re: Re: How RDD lineage works

2015-07-30 Thread bit1...@163.com
The following is copied from the paper, is something related with rdd lineage. 
Is there a unit test that covers this scenario(rdd partition lost and recovery)?
Thanks. 

If a partition of an RDD is lost, the RDD has enough information about how it 
was derived from other RDDs to recompute 
just that partition. Thus, lost data can be recovered, often quite quickly, 
without requiring costly replication.



bit1...@163.com
 
From: bit1...@163.com
Date: 2015-07-31 13:11
To: Tathagata Das; yuzhihong
CC: user
Subject: Re: Re: How RDD lineage works
Thanks TD and Zhihong for the guide. I will check it




bit1...@163.com
 
From: Tathagata Das
Date: 2015-07-31 12:27
To: Ted Yu
CC: bit1...@163.com; user
Subject: Re: How RDD lineage works
You have to read the original Spark paper to understand how RDD lineage works. 
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote:
Please take a look at:
core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Cheers

On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote:
Hi,

I don't get a good understanding how RDD lineage works, so I would ask whether 
spark provides a unit test in the code base to illustrate how RDD lineage works.
If there is, What's the class name is it? 
Thanks!



bit1...@163.com




Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-30 Thread gaurav sharma
 I have run into similar excpetions

ERROR DirectKafkaInputDStream: ArrayBuffer(java.net.SocketTimeoutException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([AdServe,1]))


and the issue has happened on Kafka Side, where my broker offsets go out of
sync, or do not return leader for this particular partition

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic AdServe
--broker-list BROKER_IP --time -1

this shall return u valid offsets for all your kafka partitons


On Thu, Jul 30, 2015 at 7:58 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi Cody sorry my bad you were right there was a typo in topicSet. When I
 corrected typo in topicSet it started working. Thanks a lot.

 Regards

 On Thu, Jul 30, 2015 at 7:43 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Can you post the code including the values of kafkaParams and topicSet,
 ideally the relevant output of kafka-topics.sh --describe as well

 On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks for the response. Like I already mentioned in the question
 kafka topic is valid and it has data I can see data in it using another
 kafka consumer.
 On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote:

 The last time someone brought this up on the mailing list, the issue
 actually was that the topic(s) didn't exist in Kafka at the time the spark
 job was running.





 On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 There is a known issue that Kafka cannot return leader if there is not
 data in the topic. I think it was raised in another thread in this forum.
 Is that the issue?

 On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 Hi I have Spark Streaming code which streams from Kafka topic it used
 to work
 fine but suddenly it started throwing the following exception

 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set()
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at scala.util.Either.fold(Either.scala:97)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528)
 at

 org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
 My Spark Streaming client code is very simple I just create one
 receiver
 using the following code and trying to print messages it consumed

 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicSet);

 Kafka param is only one I specify kafka.ofset.reset=largest. Kafka
 topic has
 data I can see data using other Kafka consumers but above Spark
 Streaming
 code throws exception saying leader offset not found. I tried both
 smallest
 and largest offset. I wonder what happened this code used to work
 earlier. I
 am using Spark-Streaming 1.3.1 as it was working in this version I
 tried in
 1.4.1 and same exception. Please guide. I am new to Spark thanks in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Getting the number of slaves

2015-07-30 Thread Tathagata Das
To clarify, that is the number of executors requested by the SparkContext
from the cluster manager.

On Tue, Jul 28, 2015 at 5:18 PM, amkcom amk...@gmail.com wrote:

 try sc.getConf.getInt(spark.executor.instances, 1)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p24043.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Re: How RDD lineage works

2015-07-30 Thread Tathagata Das
https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FailureSuite.scala
This may help.

On Thu, Jul 30, 2015 at 10:42 PM, bit1...@163.com bit1...@163.com wrote:

 The following is copied from the paper, is something related with rdd
 lineage. Is there a unit test that covers this scenario(rdd partition lost
 and recovery)?
 Thanks.

 If a partition of an RDD is lost, the RDD has enough information about how
 it was derived from other RDDs to recompute
 just that partition. Thus, lost data can be recovered, often quite
 quickly, without requiring costly replication.

 --
 bit1...@163.com


 *From:* bit1...@163.com
 *Date:* 2015-07-31 13:11
 *To:* Tathagata Das tathagata.das1...@gmail.com; yuzhihong
 yuzhih...@gmail.com
 *CC:* user user@spark.apache.org
 *Subject:* Re: Re: How RDD lineage works
 Thanks TD and Zhihong for the guide. I will check it


 --
 bit1...@163.com


 *From:* Tathagata Das tathagata.das1...@gmail.com
 *Date:* 2015-07-31 12:27
 *To:* Ted Yu yuzhih...@gmail.com
 *CC:* bit1...@163.com; user user@spark.apache.org
 *Subject:* Re: How RDD lineage works
 You have to read the original Spark paper to understand how RDD lineage
 works.
 https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

 On Thu, Jul 30, 2015 at 9:25 PM, Ted Yu yuzhih...@gmail.com wrote:

 Please take a look at:
 core/src/test/scala/org/apache/spark/CheckpointSuite.scala

 Cheers

 On Thu, Jul 30, 2015 at 7:39 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,

 I don't get a good understanding how RDD lineage works, so I would ask
 whether spark provides a unit test in the code base to illustrate how RDD
 lineage works.
 If there is, What's the class name is it?
 Thanks!

 --
 bit1...@163.com






Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Martin Senne
Dear Michael, dear all,

motivation:

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, hello), Record(2, bob))
  val mappings = Seq( Mapping(2, 5) )
}

Now I want to perform an *left outer join* on records and mappings
(with the ON JOIN criterion on columns (recordDF(x) ===
mappingDF(x)  shorthand is in
*leftOuterJoinWithRemovalOfEqualColumn*

val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x)

joinedDF.filter(joinedDF(y).isNotNull).show


Currently, the output is

+-+-++

|x|a|   y|
+-+-++
|1|hello|null|
|2|  bob|   5|
+-+-++

instead of

+-+---+-+

|x|  a|y|
+-+---+-+
|2|bob|5|
+-+---+-+

The last output can be achieved by the method of changing nullable=false to
nullable=true described in my first post.

*Thus, I need this schema modification as to make outer joins work.*

Cheers and thanks,

Martin



2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
 don't actually leverage this information in many places yet.  Why do you
 want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
 martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Master Build Git Commit Hash

2015-07-30 Thread Jerry Lam
Hi Ted,

The problem is that I don't know if the build uses the commits happened on
the same day or it is possible that it builds based on Jul 15th commits.
Just a thought, it might be possible to replace SNAPSHOT with the git
commit hash in the filename so people will know which one is based on.

Thank you for your help!

Jerry

On Thu, Jul 30, 2015 at 11:10 AM, Ted Yu yuzhih...@gmail.com wrote:

 The files were dated 16-Jul-2015
 Looks like nightly build either was not published, or published at a
 different location.

 You can download spark-1.5.0-SNAPSHOT.tgz and binary-search for the
 commits made on Jul 16th.
 There may be other ways of determining the latest commit.

 Cheers

 On Thu, Jul 30, 2015 at 7:39 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Spark users and developers,

 I wonder which git commit was used to build the latest master-nightly
 build found at:
 http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
 ?
 I downloaded the build but I couldn't find the information related to it.
 Thank you!

 Best Regards,

 Jerry





Re: Spark SQL Error

2015-07-30 Thread Akhil Das
It seem an issue with the ES connector
https://github.com/elastic/elasticsearch-hadoop/issues/482

Thanks
Best Regards

On Tue, Jul 28, 2015 at 6:14 AM, An Tran tra...@gmail.com wrote:

 Hello all,

 I am currently having an error with Spark SQL access Elasticsearch using
 Elasticsearch Spark integration.  Below is the series of command I issued
 along with the stacktrace.  I am unclear what the error could mean.  I can
 print the schema correctly but error out if i try and display a few
 results.  Can you guys point me in the right direction?

 scala
 sqlContext.read.format(org.elasticsearch.spark.sql).options(esOptions).load(reddit_comment_public-201507-v3/default).registerTempTable(reddit_comment)


 scala reddit_comment_df.printSchema

 root

  |-- data: struct (nullable = true)

  ||-- archived: boolean (nullable = true)

  ||-- author: string (nullable = true)

  ||-- author_flair_css_class: string (nullable = true)

  ||-- author_flair_text: string (nullable = true)

  ||-- body: string (nullable = true)

  ||-- body_html: string (nullable = true)

  ||-- controversiality: long (nullable = true)

  ||-- created: long (nullable = true)

  ||-- created_utc: long (nullable = true)

  ||-- distinguished: string (nullable = true)

  ||-- downs: long (nullable = true)

  ||-- edited: long (nullable = true)

  ||-- gilded: long (nullable = true)

  ||-- id: string (nullable = true)

  ||-- link_author: string (nullable = true)

  ||-- link_id: string (nullable = true)

  ||-- link_title: string (nullable = true)

  ||-- link_url: string (nullable = true)

  ||-- name: string (nullable = true)

  ||-- parent_id: string (nullable = true)

  ||-- replies: string (nullable = true)

  ||-- saved: boolean (nullable = true)

  ||-- score: long (nullable = true)

  ||-- score_hidden: boolean (nullable = true)

  ||-- subreddit: string (nullable = true)

  ||-- subreddit_id: string (nullable = true)

  ||-- ups: long (nullable = true)



 scala reddit_comment_df.show

 15/07/27 20:38:31 INFO ScalaEsRowRDD: Reading from
 [reddit_comment_public-201507-v3/default]

 15/07/27 20:38:31 INFO ScalaEsRowRDD: Discovered mapping
 {reddit_comment_public-201507-v3=[mappings=[default=[acquire_date=DATE,
 elasticsearch_date_partition_index=STRING,
 elasticsearch_language_partition_index=STRING, elasticsearch_type=STRING,
 source=[data=[archived=BOOLEAN, author=STRING,
 author_flair_css_class=STRING, author_flair_text=STRING, body=STRING,
 body_html=STRING, controversiality=LONG, created=LONG, created_utc=LONG,
 distinguished=STRING, downs=LONG, edited=LONG, gilded=LONG, id=STRING,
 link_author=STRING, link_id=STRING, link_title=STRING, link_url=STRING,
 name=STRING, parent_id=STRING, replies=STRING, saved=BOOLEAN, score=LONG,
 score_hidden=BOOLEAN, subreddit=STRING, subreddit_id=STRING, ups=LONG],
 kind=STRING], source_geo_location=GEO_POINT, source_id=STRING,
 source_language=STRING, source_time=DATE]]]} for
 [reddit_comment_public-201507-v3/default]

 15/07/27 20:38:31 INFO SparkContext: Starting job: show at console:26

 15/07/27 20:38:31 INFO DAGScheduler: Got job 13 (show at console:26)
 with 1 output partitions (allowLocal=false)

 15/07/27 20:38:31 INFO DAGScheduler: Final stage: ResultStage 16(show at
 console:26)

 15/07/27 20:38:31 INFO DAGScheduler: Parents of final stage: List()

 15/07/27 20:38:31 INFO DAGScheduler: Missing parents: List()

 15/07/27 20:38:31 INFO DAGScheduler: Submitting ResultStage 16
 (MapPartitionsRDD[65] at show at console:26), which has no missing parents

 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(7520) called with
 curMem=71364, maxMem=2778778828

 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13 stored as values in
 memory (estimated size 7.3 KB, free 2.6 GB)

 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(3804) called with
 curMem=78884, maxMem=2778778828

 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13_piece0 stored as
 bytes in memory (estimated size 3.7 KB, free 2.6 GB)

 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in
 memory on 172.25.185.239:58296 (size: 3.7 KB, free: 2.6 GB)

 15/07/27 20:38:31 INFO SparkContext: Created broadcast 13 from broadcast
 at DAGScheduler.scala:874

 15/07/27 20:38:31 INFO DAGScheduler: Submitting 1 missing tasks from
 ResultStage 16 (MapPartitionsRDD[65] at show at console:26)

 15/07/27 20:38:31 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks

 15/07/27 20:38:31 INFO FairSchedulableBuilder: Added task set TaskSet_16
 tasks to pool default

 15/07/27 20:38:31 INFO TaskSetManager: Starting task 0.0 in stage 16.0
 (TID 172, 172.25.185.164, ANY, 5085 bytes)

 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in
 memory on 172.25.185.164:50275 (size: 3.7 KB, free: 3.6 GB)

 15/07/27 20:38:31 WARN TaskSetManager: Lost task 0.0 in stage 16.0 (TID
 172, 172.25.185.164): 

Re: streaming issue

2015-07-30 Thread Akhil Das
What operation are you doing with streaming? Also can you look in the
datanode logs and see whats going on?

Thanks
Best Regards

On Tue, Jul 28, 2015 at 8:18 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Hi,
 I got a error when running spark streaming as below .

 java.lang.reflect.InvocationTargetException

 at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)

 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at 
 org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)

 at 
 org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
 at scala.Option.foreach(Option.scala:236)

 at 
 org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)

 at 
 org.apache.spark.scheduler.EventLoggingListener.onUnpersistRDD(EventLoggingListener.scala:175)
 at 
 org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:50)


 at 
 org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)

 at 
 org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)

 at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)

 at 
 org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)

 at 
 org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)

 at 
 org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)

 at 
 org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)

 at 
 org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
 Caused by: java.io.IOException: All datanodes 10.153.192.159:50010
  are bad. Aborting...

 at 
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1137)

 at 
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:933)

 at 
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:487)

 15/07/28 02:01:10 ERROR LiveListenerBus: Listener EventLoggingListener threw 
 an exception
 java.lang.reflect.InvocationTargetException

 at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)

 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at 
 org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)

 at 
 org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
 at scala.Option.foreach(Option.scala:236)

 I had set the ulimit in /etc/security/limits.conf  , but still get the
 same exception . can please some body help me to resolved this issue ?
 core file size  (blocks, -c) 0
 data seg size   (kbytes, -d) unlimited
 scheduling priority (-e) 0
 file size   (blocks, -f) unlimited
 pending signals (-i) 264192
 max locked memory   (kbytes, -l) 32
 max memory size (kbytes, -m) unlimited
 open files  (-n) 65535
 pipe size(512 bytes, -p) 8
 POSIX message queues (bytes, -q) 819200
 real-time priority  (-r) 0
 stack size  (kbytes, -s) 10240
 cpu time   (seconds, -t) unlimited
 max user processes  (-u) 34816
 virtual memory  (kbytes, -v) unlimited
 file locks  (-x) unlimited

 Thanks .



Re: Spark and Speech Recognition

2015-07-30 Thread Akhil Das
Like this?

val data = sc.textFile(/sigmoid/audio/data/, 24).foreachPartition(urls =
speachRecognizer(urls))

Let 24 be the total number of cores that you have on all the workers.

Thanks
Best Regards

On Wed, Jul 29, 2015 at 6:50 AM, Peter Wolf opus...@gmail.com wrote:

 Hello, I am writing a Spark application to use speech recognition to
 transcribe a very large number of recordings.

 I need some help configuring Spark.

 My app is basically a transformation with no side effects: recording URL
 -- transcript.  The input is a huge file with one URL per line, and the
 output is a huge file of transcripts.

 The speech recognizer is written in Java (Sphinx4), so it can be packaged
 as a JAR.

 The recognizer is very processor intensive, so you can't run too many on
 one machine-- perhaps one recognizer per core.  The recognizer is also
 big-- maybe 1 GB.  But, most of the recognizer is a immutable acoustic and
 language models that can be shared with other instances of the recognizer.

 So I want to run about one recognizer per core of each machine in my
 cluster.  I want all recognizer on one machine to run within the same JVM
 and share the same models.

 How does one configure Spark for this sort of application?  How does one
 control how Spark deploys the stages of the process.  Can someone point me
 to an appropriate doc or keywords I should Google.

 Thanks
 Peter



Re: Spark on YARN

2015-07-30 Thread Jeetendra Gangele
I can't see the application logs here. All the logs are going into stderr.
can anybody help here?

On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.com wrote:

 I am running below command this is default spark PI program but this is
 not running all the log are going in stderr but at the terminal job is
 succeeding .I guess there are con issue job it not at all launching

 /bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10


 Complete log

 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in 
 [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in 
 [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for 
 [TERM, HUP, INT]
 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
 appattempt_1438090734187_0010_01
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
 with modify permissions: Set(hadoop)
 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application 
 in a separate Thread
 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
 initialization
 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
 initialization ...
 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1
 15/07/30 12:13:33 WARN spark.SparkConf:
 SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411').
 This is deprecated in Spark 1.0+.

 Please instead use:
  - ./spark-submit with conf/spark-defaults.conf to set defaults for an 
 application
  - ./spark-submit with --driver-java-options to set -X options for a driver
  - spark.executor.extraJavaOptions to set -X options for executors
  - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master 
 or worker)

 15/07/30 12:13:33 WARN spark.SparkConf: Setting 
 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
 work-around.
 15/07/30 12:13:33 WARN spark.SparkConf: Setting 
 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
 work-around.
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
 with modify permissions: Set(hadoop)
 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/07/30 12:13:33 INFO Remoting: Starting remoting
 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkDriver@10.21.1.77:53411]
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' 
 on port 53411.
 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker
 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster
 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at 
 /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/blockmgr-2166bbd9-b1ed-41d1-bc95-92c6a7fbd36f
 15/07/30 12:13:34 INFO storage.MemoryStore: MemoryStore started with capacity 
 246.0 MB
 15/07/30 12:13:34 INFO spark.HttpFileServer: HTTP File server directory is 
 /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c
 15/07/30 12:13:34 INFO spark.HttpServer: Starting HTTP Server
 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT
 15/07/30 12:13:34 INFO server.AbstractConnector: Started 
 SocketConnector@0.0.0.0:52507
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'HTTP file 
 server' on port 52507.
 15/07/30 12:13:34 INFO spark.SparkEnv: Registering OutputCommitCoordinator
 15/07/30 12:13:34 INFO ui.JettyUtils: Adding filter: 
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT
 15/07/30 12:13:34 INFO server.AbstractConnector: Started 
 SelectChannelConnector@0.0.0.0:59596
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'SparkUI' on 
 port 59596.
 15/07/30 12:13:34 INFO ui.SparkUI: Started SparkUI at http://10.21.1.77:59596
 15/07/30 12:13:34 INFO cluster.YarnClusterScheduler: Created 
 YarnClusterScheduler
 15/07/30 

Re: Heatmap with Spark Streaming

2015-07-30 Thread UMESH CHAUDHARY
Thanks For the suggestion Akhil!
I looked at https://github.com/mbostock/d3/wiki/Gallery to know more about
d3, all examples described here are on static data, how we can update our
heat map from updated data, if we store it in Hbase or Mysql. I mean, do we
need to query back and fourth for it.
Is there any pluggable and more quick component for heatmap with spark
streaming?

On Thu, Jul 30, 2015 at 1:23 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can easily push data to an intermediate storage from spark streaming
 (like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3
 js.

 Thanks
 Best Regards

 On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.com
 wrote:

 I have just started using Spark Streaming and done few POCs. It is fairly
 easy to implement. I was thinking of presenting the data using some smart
 graphing  dashboarding tools e.g. Graphite or Grafna, but they don't have
 heat-maps. I also looked at Zeppelin http://zeppelin-project.org/ ,
 but unable to found any heat-map functionality. Could you please suggest
 any data visualization tools using Heat-map and Spark streaming.






Re: Writing streaming data to cassandra creates duplicates

2015-07-30 Thread Priya Ch
Hi All,

 Can someone throw insights on this ?

On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com
wrote:



 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3 partitions
 for the topic. In my streaming application, I would configure 3 receivers
 with 1 thread each such that they would receive 3 dstreams (from 3
 partitions of kafka topic) and also I implement partitioner. Now there is a
 possibility of receiving messages with same primary key twice or more, one
 is at the time message is created and other times if there is an update to
 any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com
 wrote:

 You have to partition that data on the Spark Streaming by the primary
 key, and then make sure insert data into Cassandra atomically per key, or
 per set of keys in the partition. You can use the combination of the (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:

 Hi All,

  I have a problem when writing streaming data to cassandra. Or existing
 product is on Oracle DB in which while wrtiting data, locks are maintained
 such that duplicates in the DB are avoided.

 But as spark has parallel processing architecture, if more than 1 thread
 is trying to write same data i.e with same primary key, is there as any
 scope to created duplicates? If yes, how to address this problem either
 from spark or from cassandra side ?

 Thanks,
 Padma Ch







Re: Spark build/sbt assembly

2015-07-30 Thread Akhil Das
Did you try removing this jar? build/sbt-launch-0.13.7.jar

Thanks
Best Regards

On Tue, Jul 28, 2015 at 12:08 AM, Rahul Palamuttam rahulpala...@gmail.com
wrote:

 Hi All,

 I hope this is the right place to post troubleshooting questions.
 I've been following the install instructions and I get the following error
 when running the following from Spark home directory

 $./build/sbt
 Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME.
 Note, this will be overridden by -java-home if it is set.
 Attempting to fetch sbt
 Launching sbt from build/sbt-launch-0.13.7.jar
 Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar

 However when I run sbt assembly it compiles, with a couple of warnings, but
 it works none-the less.
 Is the build/sbt script deprecated? I do notice on one node it works but on
 the other it gives me the above error.

 Thanks,

 Rahul P



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: help plz! how to use zipWithIndex to each subset of a RDD

2015-07-30 Thread rok
zipWithIndex gives you global indices, which is not what you want. You'll
want to use flatMap with a map function that iterates through each iterable
and returns the (String, Int, String) tuple for each element.

On Thu, Jul 30, 2015 at 4:13 AM, askformore [via Apache Spark User List] 
ml-node+s1001560n24071...@n3.nabble.com wrote:

 I have some data like this: RDD[(String, String)] = ((*key-1*, a), (
 *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want
 to group the data by Key, and for each group, add index fields to the
 groupmember, at last I can transform the data to below : RDD[(String,
 *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2,
 *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then I
 got a RDD[(String, Iterable[String])], but I don't know how to use
 zipWithIndex function to each Iterable... thanks.

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cm9rcm9za2FyQGdtYWlsLmNvbXwxfC0xNDM4OTI3NjU3
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071p24074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Heatmap with Spark Streaming

2015-07-30 Thread Akhil Das
You can easily push data to an intermediate storage from spark streaming
(like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3
js.

Thanks
Best Regards

On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 I have just started using Spark Streaming and done few POCs. It is fairly
 easy to implement. I was thinking of presenting the data using some smart
 graphing  dashboarding tools e.g. Graphite or Grafna, but they don't have
 heat-maps. I also looked at Zeppelin http://zeppelin-project.org/ , but
 unable to found any heat-map functionality. Could you please suggest any
 data visualization tools using Heat-map and Spark streaming.





Re: sc.parallelize(512k items) doesn't always use 64 executors

2015-07-30 Thread Akhil Das
sc.parallelize takes a second parameter which is the total number of
partitions, are you using that?

Thanks
Best Regards

On Wed, Jul 29, 2015 at 9:27 PM, Kostas Kougios 
kostas.koug...@googlemail.com wrote:

 Hi, I do an sc.parallelize with a list of 512k items. But sometimes not all
 executors are used, i.e. they don't have work to do and nothing is logged
 after:

 15/07/29 16:35:22 WARN internal.ThreadLocalRandom: Failed to generate a
 seed
 from SecureRandom within 3 seconds. Not enough entrophy?
 15/07/29 16:35:22 INFO util.Utils: Successfully started service
 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56477.
 15/07/29 16:35:22 INFO netty.NettyBlockTransferService: Server created on
 56477
 15/07/29 16:35:22 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 15/07/29 16:35:22 INFO storage.BlockManagerMaster: Registered BlockManager

 Any ideas why so? My last run has 3 of the 64 executors not used.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelize-512k-items-doesn-t-always-use-64-executors-tp24062.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark on YARN

2015-07-30 Thread Jeff Zhang
 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15:
SIGTERM

AM is killed somehow, may due to preemption. Does it always happen ?
Resource manager log would be helpful.



On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 I can't see the application logs here. All the logs are going into stderr.
 can anybody help here?

 On 30 July 2015 at 12:21, Jeetendra Gangele gangele...@gmail.com wrote:

 I am running below command this is default spark PI program but this is
 not running all the log are going in stderr but at the terminal job is
 succeeding .I guess there are con issue job it not at all launching

 /bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10


 Complete log

 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in 
 [jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in 
 [jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers 
 for [TERM, HUP, INT]
 15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
 appattempt_1438090734187_0010_01
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: 
 authentication disabled; ui acls disabled; users with view permissions: 
 Set(hadoop); users with modify permissions: Set(hadoop)
 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application 
 in a separate Thread
 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
 initialization
 15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
 initialization ...
 15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1
 15/07/30 12:13:33 WARN spark.SparkConf:
 SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411').
 This is deprecated in Spark 1.0+.

 Please instead use:
  - ./spark-submit with conf/spark-defaults.conf to set defaults for an 
 application
  - ./spark-submit with --driver-java-options to set -X options for a driver
  - spark.executor.extraJavaOptions to set -X options for executors
  - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master 
 or worker)

 15/07/30 12:13:33 WARN spark.SparkConf: Setting 
 'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
 work-around.
 15/07/30 12:13:33 WARN spark.SparkConf: Setting 
 'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
 work-around.
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop
 15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: 
 authentication disabled; ui acls disabled; users with view permissions: 
 Set(hadoop); users with modify permissions: Set(hadoop)
 15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/07/30 12:13:33 INFO Remoting: Starting remoting
 15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkDriver@10.21.1.77:53411]
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 
 'sparkDriver' on port 53411.
 15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker
 15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster
 15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at 
 /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/blockmgr-2166bbd9-b1ed-41d1-bc95-92c6a7fbd36f
 15/07/30 12:13:34 INFO storage.MemoryStore: MemoryStore started with 
 capacity 246.0 MB
 15/07/30 12:13:34 INFO spark.HttpFileServer: HTTP File server directory is 
 /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c
 15/07/30 12:13:34 INFO spark.HttpServer: Starting HTTP Server
 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT
 15/07/30 12:13:34 INFO server.AbstractConnector: Started 
 SocketConnector@0.0.0.0:52507
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'HTTP file 
 server' on port 52507.
 15/07/30 12:13:34 INFO spark.SparkEnv: Registering OutputCommitCoordinator
 15/07/30 12:13:34 INFO ui.JettyUtils: Adding filter: 
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT
 15/07/30 12:13:34 INFO server.AbstractConnector: Started 
 

Re: Problems with JobScheduler

2015-07-30 Thread Guillermo Ortiz
The difference is that one recives more data than the others two. I can
pass thought parameters the topics, so, I could execute the code trying
with one topic and figure out with one is the topic, although I guess that
it's the topics which gets more data.

Anyway it's pretty weird those delays in just one of the cluster even if
the another one is not running.
I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I
haven't set any value for this parameter, how does it work if this
parameter doesn't have a value?

2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I have three topics with one partition each topic. So each jobs run about
 one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents. I
 tried to stop one cluster and execute just the cluster isn't working but 
 it
 happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same code
 in two cluster. I read from three topics in Kafka with DirectStream so I
 have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
 memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
 24.0 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
 tasks have all completed, from pool
 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
 MetricsSpark.scala:67) finished in 60.379 s
 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s
 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
 143825821 ms.0 from job set of time 143825821 ms
 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
 143825821 ms (execution: 60.399 s)
 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
 1438258215000 ms.0 from job set of time 1438258215000 ms

 There are *always *a minute of delay in the third task, when I have
 executed same code in 

How to control Spark Executors from getting Lost when using YARN client mode?

2015-07-30 Thread unk1102
Hi I have one Spark job which runs fine locally with less data but when I
schedule it on YARN to execute I keep on getting the following ERROR and
slowly all executors gets removed from UI and my job fails

15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
myhost1.com: remote Rpc client disassociated
15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
myhost2.com: remote Rpc client disassociated
I use the following command to schedule spark job in yarn-client mode

 ./spark-submit --class com.xyz.MySpark --conf
spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options
-XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
--executor-memory 2G --executor-cores 8 --num-executors 12 
/home/myuser/myspark-1.0.jar

I dont know what is the problem please guide. I am new to Spark. Thanks in
advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sc.parallelise to work more like a producer/consumer?

2015-07-30 Thread Kostas Kougios
there is a work around.

sc.parallelise(items, items size / 2)

This way each executor will get a batch of 2 items at a time, simulating a
producer-consumer. With /4 it will get 4 items.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelise-to-work-more-like-a-producer-consumer-tp24032p24085.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread martinibus77
Hi all,

1. *Columns in dataframes can be nullable and not nullable. Having a
nullable column of Doubles, I can use the following Scala code to filter all
non-null rows:*

  val df = . // some code that creates a DataFrame
  df.filter( df(columnname).isNotNull() )

+-+-++  
|x|a|   y|
+-+-++
|1|hello|null|
|2|  bob|   5|
+-+-++

root
 |-- x: integer (nullable = false)
 |-- a: string (nullable = true)
 |-- y: integer (nullable = true)

And with the filter expression
+-+---+-+   
|x|  a|y|
+-+---+-+
|2|bob|5|
+-+---+-+


Unfortunetaly and while this is a true for a nullable column (according to
df.printSchema), it is not true for a column that is not nullable:


+-+-++  
|x|a|   y|
+-+-++
|1|hello|null|
|2|  bob|   5|
+-+-++

root
 |-- x: integer (nullable = false)
 |-- a: string (nullable = true)
 |-- y: integer (nullable = false)

+-+-++  
|x|a|   y|
+-+-++
|1|hello|null|
|2|  bob|   5|
+-+-++

such that the output is not affected by the filter. Is this intended?


2. *What is the cheapest (in sense of performance) to turn a non-nullable
column into a nullable column?
A came uo with this:*

  /**
   * Set, if a column is nullable.
   * @param df source DataFrame 
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is either
nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

val schema = df.schema
val newSchema = StructType(schema.map {
  case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
nullable = nullable, m)
  case y: StructField = y
})
df.sqlContext.createDataFrame( df.rdd, newSchema)
  }

Is there a cheaper solution?

3. *Any comments?*

Cheers and thx in advance,

Martin






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Michael Armbrust
We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com
wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




TFIDF Transformation

2015-07-30 Thread ziqiu.li
Hello spark users,

I hope your week is going fantastic! I am having some troubles with the TFIDF 
in MLlib and was wondering if anyone can point me to the right direction.

The data ingestion and the initial term frequency count code taken from the 
example works fine (I am using the first example from this page: 
https://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html).

Below is my input data:

WrappedArray((Frank,  spent,  Friday,  afternoon,  at,  labs,  test,  test,  
test,  test,  test,  test,  test,  test,  test))
WrappedArray((we,  are,  testing,  the,  algorithm,  with,  us,  test,  test,  
test,  test,  test,  test,  test,  test))
WrappedArray((hello,  my,  name,  is,  Hans,  and,  I,  am,  testing,  TFIDF,  
test,  test,  test,  test,  test))
WrappedArray((TFIDF,  is,  an,  amazing,  algorithm,  that,  is,  used,  for,  
spam,  filtering,  and,  search,  test,  test))
WrappedArray((Accenture,  is,  doing,  great,  test,  test,  test,  test,  
test,  test,  test,  test,  test,  test,  test))

Here's the output:

(1048576,[1065,1463,33868,34122,34252,337086,420523,603314,717226,767673,839152,876983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0])
(1048576,[1463,6313,33869,34122,118216,147517,162737,367946,583529,603314,605639,646109,876983,972879],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
(1048576,[20311,34122,340246,603314,778861,876983],[1.0,1.0,1.0,10.0,1.0,1.0])
(1048576,[33875,102986,154015,267598,360614,603314,690972,876983],[1.0,1.0,1.0,1.0,1.0,8.0,1.0,1.0])
(1048576,[1588,19537,34494,42230,603314,696550,839152,876983,972879],[1.0,1.0,1.0,1.0,7.0,1.0,1.0,1.0,1.0])

The problem I am having here is that the output from HashingTF is not ordered 
like the original sentence, I understand that the integer 603314 in the 
output stands for the word  test in the input. But how would I 
programmatically translate the number back to the word so I know which words 
are most common? Please let me know your thoughts!

I am not sure how helpful these are going to be but here are the things I've 
noticed when I was looking into the source code of TFIDF:

1. def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures) 
 This line of code hashes the term into it's ASCII value and calculates 
'ASCII' modulo 'numberFeatures'(which is defaulted 2^20)
2. Then def transform(document: Iterable[_]): Vector = { blah blah blah} --- 
This part of the code does the counting and spreads the current array into two 
separate ones using Vectors.sparse.


Thanks in advance and I hope to hear from you soon!
Best,
Hans




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Spark on YARN

2015-07-30 Thread Jeetendra Gangele
I am running below command this is default spark PI program but this is not
running all the log are going in stderr but at the terminal job is
succeeding .I guess there are con issue job it not at all launching

/bin/spark-submit --class org.apache.spark.examples.SparkPi --master
yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10


Complete log

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal
handlers for [TERM, HUP, INT]
15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId:
appattempt_1438090734187_0010_01
15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop
15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user
application in a separate Thread
15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark
context initialization
15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark
context initialization ...
15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1
15/07/30 12:13:33 WARN spark.SparkConf:
SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with conf/spark-defaults.conf to set defaults for an
application
 - ./spark-submit with --driver-java-options to set -X options for a driver
 - spark.executor.extraJavaOptions to set -X options for executors
 - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons
(master or worker)

15/07/30 12:13:33 WARN spark.SparkConf: Setting
'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a
work-around.
15/07/30 12:13:33 WARN spark.SparkConf: Setting
'spark.driver.extraJavaOptions' to '-Dspark.driver.port=53411' as a
work-around.
15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop
15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/30 12:13:33 INFO Remoting: Starting remoting
15/07/30 12:13:34 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@10.21.1.77:53411]
15/07/30 12:13:34 INFO util.Utils: Successfully started service
'sparkDriver' on port 53411.
15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker
15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster
15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local
directory at 
/home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/blockmgr-2166bbd9-b1ed-41d1-bc95-92c6a7fbd36f
15/07/30 12:13:34 INFO storage.MemoryStore: MemoryStore started with
capacity 246.0 MB
15/07/30 12:13:34 INFO spark.HttpFileServer: HTTP File server
directory is 
/home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c
15/07/30 12:13:34 INFO spark.HttpServer: Starting HTTP Server
15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/30 12:13:34 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:52507
15/07/30 12:13:34 INFO util.Utils: Successfully started service 'HTTP
file server' on port 52507.
15/07/30 12:13:34 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/07/30 12:13:34 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/07/30 12:13:34 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/07/30 12:13:34 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:59596
15/07/30 12:13:34 INFO util.Utils: Successfully started service
'SparkUI' on port 59596.
15/07/30 12:13:34 INFO ui.SparkUI: Started SparkUI at http://10.21.1.77:59596
15/07/30 12:13:34 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
15/07/30 12:13:34 INFO util.Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
43354.
15/07/30 12:13:34 INFO netty.NettyBlockTransferService: Server created on 43354
15/07/30 12:13:34 INFO storage.BlockManagerMaster: Trying to register

Is it Spark Serialization bug ?

2015-07-30 Thread Subshiri S
Hi all, I have tried to use lambda expression in spark task, and it throws
java.lang.IllegalArgumentException: Invalid lambda deserialization
exception. This exception is thrown when the is code like
transform(pRDD-pRDD.map(t-t._2)) . The code snippet is below.

JavaPairDStreamString,Integer aggregate =
pairRDD.reduceByKey((x,y)-x+y);JavaDStreamInteger con =
aggregate.transform((FunctionJavaPairRDDString,Integer,
JavaRDDInteger)pRDD- pRDD.map(
(FunctionTuple2String,Integer,Integer)t-t._2));

JavaPairDStreamString,Integer aggregate =
pairRDD.reduceByKey((x,y)-x+y);JavaDStreamInteger con =
aggregate.transform((FunctionJavaPairRDDString,Integer,
JavaRDDInteger  Serializable)pRDD- pRDD.map(
(FunctionTuple2String,Integer,Integer  Serializable)t-t._2));

The above two options didn't worked. Where as if I pass below object f as
the argument instead of lambda expressiont-t_.2. It works.

Function f = new
FunctionTuple2String,Integer,Integer(){@Overridepublic Integer
call(Tuple2String,Integer paramT1) throws Exception {return
paramT1._2;}};

May I know what is the right format to express that functions as a lambda
expression.

public static void main(String[] args) {

Function f = new FunctionTuple2String,Integer,Integer(){

@Override
public Integer call(Tuple2String,Integer paramT1)
throws Exception {
return paramT1._2;
}

};

JavaStreamingContext ssc = JavaStreamingFactory.getInstance();

JavaReceiverInputDStreamString lines =
ssc.socketTextStream(localhost, );
JavaDStreamString words =  lines.flatMap(s-{return
Arrays.asList(s.split( ));});
JavaPairDStreamString,Integer pairRDD =
words.mapToPair(x-new Tuple2String,Integer(x,1));
JavaPairDStreamString,Integer aggregate =
pairRDD.reduceByKey((x,y)-x+y);
JavaDStreamInteger con = aggregate.transform(
(FunctionJavaPairRDDString,Integer,
JavaRDDInteger)pRDD- pRDD.map(
(FunctionTuple2String,Integer,Integer)t-t._2));
  //JavaDStreamInteger con = aggregate.transform(pRDD-
pRDD.map(f)); It works
con.print();

ssc.start();
ssc.awaitTermination();


}


Re: Graceful shutdown for Spark Streaming

2015-07-30 Thread Tathagata Das
How is sleep not working? Are you doing

streamingContext.start()
Thread.sleep(xxx)
streamingContext.stop()

On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com
wrote:

 If we want to stop the  application after fix-time period , how it will
 work . (How to give the duration in logic , in my case  sleep(t.s.)  is not
 working .)  So i used to kill coarseGrained job at each slave by script
 .Please suggest something .

 On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com
 wrote:

 StreamingContext.stop(stopGracefully = true) stops the streaming context
 gracefully.
 Then you can safely terminate the Spark cluster. They are two different
 steps and needs to be done separately ensuring that the driver process has
 been completely terminated before the Spark cluster is the terminated.

 On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com
 wrote:

 How to initiate graceful shutdown from outside of the Spark Streaming
 driver process? Both for the local and cluster mode of Spark Standalone as
 well as EMR.

 Does sbin/stop-all.sh stop the context gracefully? How is it done? Is
 there a signal sent to the driver process?

 For EMR, is there a way how to terminate an EMR cluster with Spark
 Streaming graceful shutdown?

 Thanks!






 --
 Thanks  Regards,
 Anshu Shukla



Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-07-30 Thread Ashwin Giridharan
What is your cluster configuration ( size and resources) ?

If you do not have enough resources, then your executor will not run.
Moreover allocating 8 cores to an executor is too much.

If you have a cluster with four nodes running NodeManagers, each equipped
with 4 cores and 8GB of memory,
then an optimal configuration would be,

--num-executors 8 --executor-cores 2 --executor-memory 2G

Thanks,
Ashwin

On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have one Spark job which runs fine locally with less data but when I
 schedule it on YARN to execute I keep on getting the following ERROR and
 slowly all executors gets removed from UI and my job fails

 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
 myhost1.com: remote Rpc client disassociated
 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
 myhost2.com: remote Rpc client disassociated
 I use the following command to schedule spark job in yarn-client mode

  ./spark-submit --class com.xyz.MySpark --conf
 spark.executor.extraJavaOptions=-XX:MaxPermSize=512M
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
 --executor-memory 2G --executor-cores 8 --num-executors 12
 /home/myuser/myspark-1.0.jar

 I dont know what is the problem please guide. I am new to Spark. Thanks in
 advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Thanks  Regards,
Ashwin Giridharan


Re: sc.parallelize(512k items) doesn't always use 64 executors

2015-07-30 Thread Konstantinos Kougios

yes,thanks, that sorted out the issue.

On 30/07/15 09:26, Akhil Das wrote:
sc.parallelize takes a second parameter which is the total number of 
partitions, are you using that?


Thanks
Best Regards

On Wed, Jul 29, 2015 at 9:27 PM, Kostas Kougios 
kostas.koug...@googlemail.com mailto:kostas.koug...@googlemail.com 
wrote:


Hi, I do an sc.parallelize with a list of 512k items. But
sometimes not all
executors are used, i.e. they don't have work to do and nothing is
logged
after:

15/07/29 16:35:22 WARN internal.ThreadLocalRandom: Failed to
generate a seed
from SecureRandom within 3 seconds. Not enough entrophy?
15/07/29 16:35:22 INFO util.Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
56477.
15/07/29 16:35:22 INFO netty.NettyBlockTransferService: Server
created on
56477
15/07/29 16:35:22 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/07/29 16:35:22 INFO storage.BlockManagerMaster: Registered
BlockManager

Any ideas why so? My last run has 3 of the 64 executors not used.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/sc-parallelize-512k-items-doesn-t-always-use-64-executors-tp24062.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org






Re: Difference between RandomForestModel and RandomForestClassificationModel

2015-07-30 Thread Bryan Cutler
Hi Praveen,

In MLLib, the major difference is that RandomForestClassificationModel
makes use of a newer API which utilizes ML pipelines.  I can't say for
certain if they will produce the same exact result for a given dataset, but
I believe they should.

Bryan

On Wed, Jul 29, 2015 at 12:14 PM, praveen S mylogi...@gmail.com wrote:

 Hi
 Wanted to know what is the difference between
 RandomForestModel and RandomForestClassificationModel?
 in Mlib.. Will they yield the same results for a given dataset?



Re: HiveQL to SparkSQL

2015-07-30 Thread Bigdata techguy
Thanks Jorn for the response and for the pointer questions to Hive
optimization tips.

I believe I have done the possible  applicable things to improve hive
query performance including but not limited to - running on TEZ, using
partitioning, bucketing, using explain to make sure partition pruning is
happening, using compression, using the best data types for join columns,
denormalizing etc:. I am using Hive version - 0.13.

The idea behind this POC is to find the strengths of SparkSQL over HiveQL
and identify the use cases where SparkSQL can perform better than HiveQL
other than the iterative use cases. In general, what would be the
SparkSQL use scenarios?

I am pretty sure someone have tried this before and compared
performance...Any responses would be much appreciated. Thank you.


On Wed, Jul 29, 2015 at 1:57 PM, Jörn Franke jornfra...@gmail.com wrote:

 What Hive Version are you using? Do you run it in on TEZ? Are you using
 the ORC Format? Do you use compression? Snappy? Do you use Bloom filters?
 Do you insert the data sorted on the right columns? Do you use
 partitioning? Did you increase the replication factor for often used tables
 or partitions? Do you use bucketing? Is your data model appropriate (join
 columns  as int , use numeric data types where appropriate ,  dates as
 int...), dif you calculate statistics? Did you use indexes (compressed, ORC
 Format?) do you provide mapjoin hints? Did you do any other Hive
 optimization? Did you use explain to verify that only selected partitions,
 indexes, Bloom filters had been used?
 Did you verify that no other application has taken resources? What is the
 CPU level  on namenode, hiveserver2? If it is high then you need Mord
 memory there!

 First rule is to get it Hive right before you think about in-memory.
 Caching will only help for iterative stuff. You may think about
 denormalizing the model even more to avoid joins  as much  as possible.

 Bigdata techguy bigdatatech...@gmail.com schrieb am Mi., 29.07.2015,
 18:49:

 Hi All,

 I have a fairly complex HiveQL data processing which I am trying to
 convert to SparkSQL to improve performance. Below is what it does.

 Select around 100 columns including Aggregates
 From a FACT_TABLE
 Joined to the summary of the same FACT_TABLE
 Joined to 2 smaller DIMENSION tables.

 The data processing currently takes around an hour to complete
 processing.

 This is what I have tried so far.

 1. Use hiveContext to query the DIMENSION tables, store it as DataFrames
 and registerTempTable.

 2.  Use hiveContext to query the summary of FACT_TABLE, store it as
 DataFrames and registerTempTable.

 3. Use the Temp tables from above 2 steps to get the final RecordSet to
 another DataFrame.

 4. Save the DataFrame from step 3 to Hive with InsertOverwrite using
 saveAsTable.

 Below are my questions. Any response would be much appreciated. Thanks.

 A. Is there a better approach?
 B. Does breaking down the big Hive query into multiple steps with
 multiple DataFrames expected to give better performance?
 C. Is there an opportunity to intermix RDD with SparkSQL in this case?
 D. Can the Caching of a DataFrame improve performance?
 E. Are there other suggestions to improve performance?

 Thank You for your time.




TFIDF Transformation

2015-07-30 Thread hans ziqiu li
Hello spark users!

I am having some troubles with the TFIDF in MLlib and was wondering if
anyone can point me to the right direction.

The data ingestion and the initial term frequency count code taken from the
example works fine (I am using the first example from this page:
https://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html).

Below is my input data: 

WrappedArray((Frank,  spent,  Friday,  afternoon,  at,  labs,  test,  test, 
test,  test,  test,  test,  test,  test,  test))
WrappedArray((we,  are,  testing,  the,  algorithm,  with,  us,  test, 
test,  test,  test,  test,  test,  test,  test))
WrappedArray((hello,  my,  name,  is,  Hans,  and,  I,  am,  testing, 
TFIDF,  test,  test,  test,  test,  test))
WrappedArray((TFIDF,  is,  an,  amazing,  algorithm,  that,  is,  used, 
for,  spam,  filtering,  and,  search,  test,  test))
WrappedArray((Accenture,  is,  doing,  great,  test,  test,  test,  test, 
test,  test,  test,  test,  test,  test,  test))

Here’s the output:

(1048576,[1065,1463,33868,34122,34252,337086,420523,603314,717226,767673,839152,876983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0])
(1048576,[1463,6313,33869,34122,118216,147517,162737,367946,583529,603314,605639,646109,876983,972879],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
(1048576,[20311,34122,340246,603314,778861,876983],[1.0,1.0,1.0,10.0,1.0,1.0])
(1048576,[33875,102986,154015,267598,360614,603314,690972,876983],[1.0,1.0,1.0,1.0,1.0,8.0,1.0,1.0])
(1048576,[1588,19537,34494,42230,603314,696550,839152,876983,972879],[1.0,1.0,1.0,1.0,7.0,1.0,1.0,1.0,1.0])

The problem I am having here is that the output from HashingTF is not
ordered like the original sentence, I understand that the integer “603314”
in the output stands for the word “ test” in the input. But how would I
programmatically translate the number back to the word so I know which words
are most common? Please let me know your thoughts!

I am not sure how helpful these are going to be but here are the things I’ve
noticed when I was looking into the source code of TFIDF:

1. def
indexOf(term:
Any):
Int
=
Utils.nonNegativeMod(term.##,
 numFeatures)  This line of code hashes the term into it’s ASCII value
and calculates ‘ASCII’ modulo ‘numberFeatures’(which is defaulted 2^20)
2. Then def
transform(document:
Iterable[_]):
Vector
=
 { blah blah blah} ——— This part of the code does the counting and spreads
the current array into two separate ones using Vectors.sparse.


Thanks in advance and I hope to hear from you soon!
Best,
Hans




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TFIDF-Transformation-tp24086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: apache-spark 1.3.0 and yarn integration and spring-boot as a container

2015-07-30 Thread Steve Loughran
you need to fix your configuration so that the resource manager hostname/URL is 
set...that address there is the listen on any port path
On 30 Jul 2015, at 10:47, Nirav Patel 
npa...@xactlycorp.commailto:npa...@xactlycorp.com wrote:

15/07/29 11:19:26 INFO client.RMProxy: Connecting to ResourceManager at 
/0.0.0.0:8032http://0.0.0.0:8032/

Future versions of Hadoop will give you more meaningful diagnostics messages 
here ( HADOOP-9657https://issues.apache.org/jira/browse/HADOOP-9657 ), but 
the fix is the same: get your RM hostname right.

-steve


[Parquet + Dataframes] Column names with spaces

2015-07-30 Thread angelini
Hi all,

Our data has lots of human readable column names (names that include
spaces), is it possible to use these with Parquet and Dataframes?

When I try and write the Dataframe I get the following error:

(I am using PySpark)

`AnalysisException: Attribute name Name with Space contains invalid
character(s) among  ,;{}()\n\t=. Please use alias to rename it.`

How can I alias that column name?

`df['Name with Space'] = df['Name with Space'].alias('Name')` doesn't work
as you can't assign to a dataframe column.

`df.withColumnRenamed('Name with Space', 'Name')` overwrites the column and
doesn't alias it.

Any ideas?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-Dataframes-Column-names-with-spaces-tp24088.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cast Error DataFrame/RDD doing group by and case class

2015-07-30 Thread Rishabh Bhardwaj
Hi,
I have just started learning DF in sparks and encountered the following
error:

I am creating the following :
*case class A(a1:String,a2:String,a3:String)*
*case class B(b1:String,b2:String,b3:String)*
*case class C(key:A,value:Seq[B])*


Now I have to do a DF with struc
(key :{..},value:{..} i.e *case class C(key:A,value:B)*)

I want to do a group by on this DF which results in
(key:List{value1,value2,..}) and return DF after the operation.

I am implementing the following as:

1. *val x  = DF1.map(r= (r(0),r(1) )).groupByKey*
the data in x comes as expected

2.*val y =  x.map{case (k,v) = (
C(k.asInstanceOf[A],Seq(v.toSeq.asInstanceOf[B])))}*
so now when I am doing *y.toDF.show* I am getting the following error:


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage
10.0 (TID 12, localhost): *java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to $iwC$$iwC$A*


Thanks in advance.

Regards,
Rishabh.


Re: Spark and Speech Recognition

2015-07-30 Thread Simon Elliston Ball
You might also want to consider broadcasting the models to ensure you get one 
instance shared across cores in each machine, otherwise the model will be 
serialised to each task and you'll get a copy per executor (roughly core in 
this instance)

Simon 

Sent from my iPhone

 On 30 Jul 2015, at 10:14, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Like this?
 
 val data = sc.textFile(/sigmoid/audio/data/, 24).foreachPartition(urls = 
 speachRecognizer(urls))
 
 Let 24 be the total number of cores that you have on all the workers.
 
 Thanks
 Best Regards
 
 On Wed, Jul 29, 2015 at 6:50 AM, Peter Wolf opus...@gmail.com wrote:
 Hello, I am writing a Spark application to use speech recognition to 
 transcribe a very large number of recordings.
 
 I need some help configuring Spark.
 
 My app is basically a transformation with no side effects: recording URL -- 
 transcript.  The input is a huge file with one URL per line, and the output 
 is a huge file of transcripts.  
 
 The speech recognizer is written in Java (Sphinx4), so it can be packaged as 
 a JAR.
 
 The recognizer is very processor intensive, so you can't run too many on one 
 machine-- perhaps one recognizer per core.  The recognizer is also big-- 
 maybe 1 GB.  But, most of the recognizer is a immutable acoustic and 
 language models that can be shared with other instances of the recognizer.
 
 So I want to run about one recognizer per core of each machine in my 
 cluster.  I want all recognizer on one machine to run within the same JVM 
 and share the same models.
 
 How does one configure Spark for this sort of application?  How does one 
 control how Spark deploys the stages of the process.  Can someone point me 
 to an appropriate doc or keywords I should Google.
 
 Thanks
 Peter 
 


Twitter Connector-Spark Streaming

2015-07-30 Thread Sadaf
Hi.
I am writing twitter connector using spark streaming. but it fetched the
random tweets. 
Is there any way to receive the tweets of a particular account? 

I made an app on twitter and used the credentials as given below.

 def managingCredentials(): Option[twitter4j.auth.Authorization]= 
  {
  object auth{
  val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey()
.setOAuthConsumerSecret()
.setOAuthAccessToken()
.setOAuthAccessTokenSecret()
.build
}
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter : Option[twitter4j.auth.Authorization] = 
Some(twitter_auth.getInstance(a).getAuthorization())
 atwitter
 }

Thanks :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Connector-Spark-Streaming-tp24078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error SparkStreaming after a while executing.

2015-07-30 Thread Guillermo Ortiz
I'm executing a job with Spark Streaming and got this error all times when
the job has been executing for a while (usually hours of days).

I have no idea why it's happening.

15/07/30 13:02:14 ERROR LiveListenerBus: Listener EventLoggingListener
threw an exception
java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at
org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:169)
at
org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:36)
at
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53)
at
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
Caused by: java.io.IOException: Lease timeout of 0 seconds expired.
at
org.apache.hadoop.hdfs.DFSOutputStream.abort(DFSOutputStream.java:2192)
at
org.apache.hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java:935)
at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:889)
at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417)
at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442)
at
org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298)
at java.lang.Thread.run(Thread.java:745)
15/07/30 13:02:14 INFO SparkContext: Starting job: foreachRDD at
MetricsSpark.scala:67
15/07/30 13:02:14 INFO DAGScheduler: Got job 5050 (foreachRDD at
MetricsSpark.scala:67) with 3 output partitions (allowLocal=false)
15/07/30 13:02:14 INFO DAGScheduler: Final stage: Stage 5050(foreachRDD at
MetricsSpark.scala:67)

Sometimes this error happens, but it doesn't mean that Spark stops working
forever. Because it looks like after this error Spark works correctly some
iterations, but most of time just fails after this error producing this
error all the time.

The code just filters some records and index the record in
ElasticSearch.after adding some new fields.


RE: Spark on YARN

2015-07-30 Thread Shao, Saisai
You’d better also check the log of nodemanager, sometimes because your memory 
usage exceeds the limit of Yarn container’s configuration.

I’ve met similar problem before, here is the warning log in nodemanager:

2015-07-07 17:06:07,141 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=17385,containerID=container_1436259427993_0001_02_01] is 
running beyond virtual memory limits. Current usage: 318.1 MB of 1 GB physical 
memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.

The default pmem-vmem ratio is 2.1, but seems executor requires more vmem when 
started, so nodemanager will kill it. If you met similar problem, you could 
increase this configuration “yarn.nodemanager.vmem-pmem-ratio”.

Thanks
Jerry

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Thursday, July 30, 2015 4:36 PM
To: Jeetendra Gangele
Cc: user
Subject: Re: Spark on YARN

 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM

AM is killed somehow, may due to preemption. Does it always happen ? Resource 
manager log would be helpful.



On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele 
gangele...@gmail.commailto:gangele...@gmail.com wrote:
I can't see the application logs here. All the logs are going into stderr. can 
anybody help here?

On 30 July 2015 at 12:21, Jeetendra Gangele 
gangele...@gmail.commailto:gangele...@gmail.com wrote:
I am running below command this is default spark PI program but this is not 
running all the log are going in stderr but at the terminal job is succeeding 
.I guess there are con issue job it not at all launching

/bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10


Complete log


SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for 
[TERM, HUP, INT]

15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
appattempt_1438090734187_0010_01

15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)

15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in 
a separate Thread

15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
initialization

15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
initialization ...

15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1

15/07/30 12:13:33 WARN spark.SparkConf:

SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with conf/spark-defaults.conf to set defaults for an 
application

 - ./spark-submit with --driver-java-options to set -X options for a driver

 - spark.executor.extraJavaOptions to set -X options for executors

 - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or 
worker)



15/07/30 12:13:33 WARN spark.SparkConf: Setting 
'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
work-around.

15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' 
to '-Dspark.driver.port=53411' as a work-around.

15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)

15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started

15/07/30 12:13:33 INFO Remoting: Starting remoting

15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@10.21.1.77:53411http://sparkDriver@10.21.1.77:53411]

15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' 
on port 53411.

15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker

15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster

15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at 

Re: Writing streaming data to cassandra creates duplicates

2015-07-30 Thread Juan Rodríguez Hortalá
Hi,

Just my two cents. I understand your problem is that your problem is that
you have messages with the same key in two different dstreams. What I would
do would be making a union of all the dstreams with StreamingContext.union
or several calls to DStream.union, and then I would create a pair dstream
with the primary key as key, and then I'd use groupByKey or reduceByKey (or
combineByKey etc) to combine the messages with the same primary key.

Hope that helps.

Greetings,

Juan


2015-07-30 10:50 GMT+02:00 Priya Ch learnings.chitt...@gmail.com:

 Hi All,

  Can someone throw insights on this ?

 On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:



 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3 partitions
 for the topic. In my streaming application, I would configure 3 receivers
 with 1 thread each such that they would receive 3 dstreams (from 3
 partitions of kafka topic) and also I implement partitioner. Now there is a
 possibility of receiving messages with same primary key twice or more, one
 is at the time message is created and other times if there is an update to
 any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com
 wrote:

 You have to partition that data on the Spark Streaming by the primary
 key, and then make sure insert data into Cassandra atomically per key, or
 per set of keys in the partition. You can use the combination of the (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch learnings.chitt...@gmail.com
  wrote:

 Hi All,

  I have a problem when writing streaming data to cassandra. Or existing
 product is on Oracle DB in which while wrtiting data, locks are maintained
 such that duplicates in the DB are avoided.

 But as spark has parallel processing architecture, if more than 1
 thread is trying to write same data i.e with same primary key, is there as
 any scope to created duplicates? If yes, how to address this problem either
 from spark or from cassandra side ?

 Thanks,
 Padma Ch








Re: How to set log level in spark-submit ?

2015-07-30 Thread Alexander Krasheninnikov

I saw such example in docs:
--conf 
spark.driver.extraJavaOptions=-Dlog4j.configuration=file://$path_to_file

but, unfortunately, it does not work for me.

On 30.07.2015 05:12, canan chen wrote:
Yes, that should work. What I mean is is there any option in 
spark-submit command that I can specify for the log level



On Thu, Jul 30, 2015 at 10:05 AM, Jonathan Coveney jcove...@gmail.com 
mailto:jcove...@gmail.com wrote:


Put a log4j.properties file in conf/. You can copy
log4j.properties.template as a good base


El miércoles, 29 de julio de 2015, canan chen ccn...@gmail.com
mailto:ccn...@gmail.com escribió:

Anyone know how to set log level in spark-submit ? Thanks






Re: Heatmap with Spark Streaming

2015-07-30 Thread Akhil Das
You can integrate it with any language (like php) and use ajax calls to
update the charts.

Thanks
Best Regards

On Thu, Jul 30, 2015 at 2:11 PM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 Thanks For the suggestion Akhil!
 I looked at https://github.com/mbostock/d3/wiki/Gallery to know more
 about d3, all examples described here are on static data, how we can update
 our heat map from updated data, if we store it in Hbase or Mysql. I mean,
 do we need to query back and fourth for it.
 Is there any pluggable and more quick component for heatmap with spark
 streaming?

 On Thu, Jul 30, 2015 at 1:23 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can easily push data to an intermediate storage from spark streaming
 (like HBase or a SQL/NoSQL DB etc) and then power your dashboards with d3
 js.

 Thanks
 Best Regards

 On Tue, Jul 28, 2015 at 12:18 PM, UMESH CHAUDHARY umesh9...@gmail.com
 wrote:

 I have just started using Spark Streaming and done few POCs. It is
 fairly easy to implement. I was thinking of presenting the data using some
 smart graphing  dashboarding tools e.g. Graphite or Grafna, but they don't
 have heat-maps. I also looked at Zeppelin http://zeppelin-project.org/ ,
 but unable to found any heat-map functionality. Could you please suggest
 any data visualization tools using Heat-map and Spark streaming.







Upgrade of Spark-Streaming application

2015-07-30 Thread Nicola Ferraro
Hi,
I've read about the recent updates about spark-streaming integration with
Kafka (I refer to the new approach without receivers).
In the new approach, metadata are persisted in checkpoint folders on HDFS
so that the SparkStreaming context can be recreated in case of failures.
This means that the streaming application will restart from the where it
exited and the message consuming process continues with new messages only.
Also, if I manually stop the streaming process and recreate the context
from checkpoint (using an approach similar to
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
the behavior would be the same.

Now, suppose I want to change something in the software and modify the
processing pipeline.
Can spark use the previous checkpoint to recreate the new application? Will
I ever be able to upgrade the software without processing all the messages
in Kafka again?

Regards,
Nicola


How to perform basic statistics on a Json file to explore my numeric and non-numeric variables?

2015-07-30 Thread SparknewUser
I've imported a Json file which has this schema :

sqlContext.read.json(filename).printSchema
   root
 |-- COL: long (nullable = true)
 |-- DATA: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- Crate: string (nullable = true)
 |||-- MLrate: string (nullable = true)
 |||-- Nrout: string (nullable = true)
 |||-- up: string (nullable = true)
 |-- IFAM: string (nullable = true)
 |-- KTM: long (nullable = true)


I'm new on Spark and I want to perform basic statistics like 
  * getting the min, max, mean, median and std of numeric variables 
  * getting the values frequencies for non-numeric variables.

My questions are : 
- How to change the type of my variables in my schema, from 'string' to
'numeric' ? (Crate, MLrate and Nrout should be numeric variables) ?
- How to do those basic statistics easily ?







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-perform-basic-statistics-on-a-Json-file-to-explore-my-numeric-and-non-numeric-variables-tp24077.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org