Calrification on Spark-Hadoop Configuration

2015-10-01 Thread Vinoth Sankar
Hi,

I'm new to Spark. For my application I need to overwrite Hadoop
configurations (Can't change Configurations in Hadoop as it might affect my
regular HDFS), so that Namenode IPs gets automatically resolved.What are
the ways to do so. I tried giving "spark.hadoop.dfs.ha.namenodes.nn",
"spark.hadoop.dfs.namenode.rpc-address.nn",
"spark.hadoop.dfs.namenode.http-address.nn" and other core-site & hdfs-site
conf properties in SparkConf Object. But still i get UnknownHostException.

Regards
Vinoth Sankar


automatic start of streaming job on failure on YARN

2015-10-01 Thread Jeetendra Gangele
We've a streaming application running on yarn and we would like to ensure
that is up running 24/7.

Is there a way to tell yarn to automatically restart a specific application
on failure?

There is property yarn.resourcemanager.am.max-attempts which is default set
to 2 setting it to bigger value is the solution? Also I did observed this
does not seems to work my application is failing and not starting
automatically.

Mesos has this build in support wondering why yarn is lacking here?



Regards

jeetendra


How to connect HadoopHA from spark

2015-10-01 Thread Vinoth Sankar
Hi,

How do i connect HadoopHA from SPARK. I tried overwriting hadoop
configurations from sparkCong. But Still I'm getting UnknownHostException
with following trace

java.lang.IllegalArgumentException: java.net.UnknownHostException: ABC at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:240)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:144)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:579) at
org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:524) at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at
org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at
org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at
org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1521) at
org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1528) at
org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:747) at
org.apache.spark.deploy.master.Master.removeApplication(Master.scala:710)
at
org.apache.spark.deploy.master.Master.finishApplication(Master.scala:688)
at
org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:432)
at
org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:432)
at scala.Option.foreach(Option.scala:236) at
org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1.applyOrElse(Master.scala:432)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at
org.apache.spark.deploy.master.Master.aroundReceive(Master.scala:52) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at
akka.actor.ActorCell.invoke(ActorCell.scala:487) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at
akka.dispatch.Mailbox.run(Mailbox.scala:220) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.UnknownHostException: ABC ... 38 more


Regards

Vinoth Sankar


Re: How to connect HadoopHA from spark

2015-10-01 Thread Adam McElwee
Do you have all of the required HDFS HA config options in your override?

I think these are the minimum required for HA:
dfs.nameservices
dfs.ha.namenodes.{nameservice ID}
dfs.namenode.rpc-address.{nameservice ID}.{name node ID}

On Thu, Oct 1, 2015 at 7:22 AM, Vinoth Sankar  wrote:

> Hi,
>
> How do i connect HadoopHA from SPARK. I tried overwriting hadoop
> configurations from sparkCong. But Still I'm getting UnknownHostException
> with following trace
>
> java.lang.IllegalArgumentException: java.net.UnknownHostException: ABC at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:240)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:144)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:579) at
> org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:524) at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at
> org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1521) at
> org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1528) at
> org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:747) at
> org.apache.spark.deploy.master.Master.removeApplication(Master.scala:710)
> at
> org.apache.spark.deploy.master.Master.finishApplication(Master.scala:688)
> at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:432)
> at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:432)
> at scala.Option.foreach(Option.scala:236) at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1.applyOrElse(Master.scala:432)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at
> org.apache.spark.deploy.master.Master.aroundReceive(Master.scala:52) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at
> akka.actor.ActorCell.invoke(ActorCell.scala:487) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at
> akka.dispatch.Mailbox.run(Mailbox.scala:220) at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.UnknownHostException: ABC ... 38 more
>
>
> Regards
>
> Vinoth Sankar
>
>
>


Re: Submitting with --deploy-mode cluster: uploading the jar

2015-10-01 Thread Christophe Schmitz
I am using standalone deployment, with spark 1.4.1

When I submit the job, I get no error at the submission terminal. Then I
check the webui, I can find the driver section which has a my driver
submission, with this error:  java.io.FileNotFoundException ... which point
the full path of my jar as it is on my computer (where I run spark-submit).
Now if I create this full path on the worker(s) with my jar inside, the
application get processed.


Thanks!


On Thu, Oct 1, 2015 at 10:51 AM, Saisai Shao  wrote:

> Are you running on standalone deploy mode, what Spark version are you
> running?
>
> Can you explain a little more specifically what exception occurs, how to
> provide the jar to Spark?
>
> I tried in my local machine with command:
>
> ./bin/spark-submit --verbose --master spark://hw12100.local:7077
> --deploy-mode cluster --class org.apache.spark.examples.SparkPi
> examples/target/scala-2.10/spark-examples-1.5.1-hadoop2.7.1.jar
>
> Seems Spark will upload this examples jar automatically, don't need to
> handle it manually.
>
> Thanks
> Saisai
>
>
>
> On Thu, Oct 1, 2015 at 8:36 AM, Christophe Schmitz 
> wrote:
>
>> Hi Saisai
>>
>> I am using this command:
>> spark-submit --deploy-mode cluster --properties-file file.conf --class
>> myclass test-assembly-1.0.jar
>>
>> The application start only if I manually copy test-assembly-1.0.jar in
>> all the worer (or the master, I don't remember) and provide the full path
>> of the file.
>>
>> On the other hand with --deploy-mode client I don't need to do that, but
>> then I need to accept incoming connection in my client to serve the jar
>> (which is not possible behind a firewall I don't control)
>>
>> Thanks,
>>
>> Christophe
>>
>>
>> On Wed, Sep 30, 2015 at 5:19 PM, Saisai Shao 
>> wrote:
>>
>>> As I remembered you don't need to upload application jar manually, Spark
>>> will do it for you when you use Spark submit. Would you mind posting out
>>> your command of Spark submit?
>>>
>>>
>>> On Wed, Sep 30, 2015 at 3:13 PM, Christophe Schmitz >> > wrote:
>>>
 Hi there,

 I am trying to use the "--deploy-mode cluster" option to submit my job
 (spark 1.4.1). When I do that, the spark-driver (on the cluster) is looking
 for my application jar. I can manually copy my application jar on all the
 workers, but I was wondering if there is a way to submit the application
 jar when running spark-submit.

 Thanks!

>>>
>>>
>>
>


Re: Calrification on Spark-Hadoop Configuration

2015-10-01 Thread Sabarish Sasidharan
You can point to your custom HADOOP_CONF_DIR in your spark-env.sh

Regards
Sab
On 01-Oct-2015 5:22 pm, "Vinoth Sankar"  wrote:

> Hi,
>
> I'm new to Spark. For my application I need to overwrite Hadoop
> configurations (Can't change Configurations in Hadoop as it might affect my
> regular HDFS), so that Namenode IPs gets automatically resolved.What are
> the ways to do so. I tried giving "spark.hadoop.dfs.ha.namenodes.nn",
> "spark.hadoop.dfs.namenode.rpc-address.nn",
> "spark.hadoop.dfs.namenode.http-address.nn" and other core-site & hdfs-site
> conf properties in SparkConf Object. But still i get UnknownHostException.
>
> Regards
> Vinoth Sankar
>


Re: Kafka Direct Stream

2015-10-01 Thread Adrian Tanase
On top of that you could make the topic part of the key (e.g. keyBy in 
.transform or manually emitting a tuple) and use one of the .xxxByKey operators 
for the processing.

If you have a stable, domain specific list of topics (e.g. 3-5 named topics) 
and the processing is really different, I would also look at filtering by topic 
and saving as different Dstreams in your code.

Either way you need to start with Cody’s tip in order to extract the topic name.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:06 PM
To: Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

You can get the topic for a given partition from the offset range.  You can 
either filter using that; or just have a single rdd and match on topic when 
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
> wrote:
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I am 
able to consume fine but I am stuck at how to separate the data for each topic 
since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each 
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit



Re: Kafka Direct Stream

2015-10-01 Thread Cody Koeninger
You can get the topic for a given partition from the offset range.  You can
either filter using that; or just have a single rdd and match on topic when
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:

> Hi,
>
> I am using spark direct stream to consume from multiple topics in Kafka. I
> am able to consume fine but I am stuck at how to separate the data for each
> topic since I need to process data differently depending on the topic.
> I basically want to split the RDD consisting on N topics into N RDD's each
> having 1 topic.
>
> Any help would be appreciated.
>
> Thanks in advance,
> Udit
>


Re: Lost leader exception in Kafka Direct for Streaming

2015-10-01 Thread Cody Koeninger
Did you check you kafka broker logs to see what was going on during that
time?

The direct stream will handle normal leader loss / rebalance by retrying
tasks.

But the exception you got indicates that something with kafka was wrong,
such that offsets were being re-used.

ie. your job already processed up through beginning offset 15027734702

but when asking kafka for the highest available offsets, it returns ending
offset 15027725493

which is lower, in other words kafka lost messages.  This might happen
because you lost a leader and recovered from a replica that wasn't in sync,
or someone manually screwed up a topic, or ... ?

If you really want to just blindly "recover" from this situation (even
though something is probably wrong with your data), the most
straightforward thing to do is monitor and restart your job.




On Wed, Sep 30, 2015 at 4:31 PM, swetha  wrote:

>
> Hi,
>
> I see this sometimes in our Kafka Direct approach in our Streaming job. How
> do we make sure that the job recovers from such errors and works normally
> thereafter?
>
> 15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
> 19,  sleeping for 200ms
> 15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
> 5,  sleeping for 200ms
>
> Followed by every task failing with something like this:
>
> 15/09/30 05:26:20 ERROR Executor: Exception in task 4.0 in stage 84281.0
> (TID 818804)
> kafka.common.NotLeaderForPartitionException
>
> And:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
> in stage 84958.0 failed 4 times, most recent failure: Lost task 15.3 in
> stage 84958.0 (TID 819461, 10.227.68.102): java.lang.AssertionError:
> assertion failed: Beginning offset 15027734702 is after the ending offset
> 15027725493 for topic hubble_stream partition 12. You either provided an
> invalid fromOffset, or the Kafka topic has been damaged
>
>
> Thanks,
> Swetha
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Lost-leader-exception-in-Kafka-Direct-for-Streaming-tp24891.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Deploying spark-streaming application on production

2015-10-01 Thread Jeetendra Gangele
Ya Also I think I need to enable the checkpointing and rather then building
the lineage DAG need to store the RDD data into HDFS.

On 23 September 2015 at 01:04, Adrian Tanase  wrote:

> btw I re-read the docs and I want to clarify that reliable receiver + WAL
> gives you at least once, not exactly once semantics.
>
> Sent from my iPhone
>
> On 21 Sep 2015, at 21:50, Adrian Tanase  wrote:
>
> I'm wondering, isn't this the canonical use case for WAL + reliable
> receiver?
>
> As far as I know you can tune Mqtt server to wait for ack on messages (qos
> level 2?).
> With some support from the client libray you could achieve exactly once
> semantics on the read side, if you ack message only after writing it to
> WAL, correct?
>
> -adrian
>
> Sent from my iPhone
>
> On 21 Sep 2015, at 12:35, Petr Novak  wrote:
>
> In short there is no direct support for it in Spark AFAIK. You will either
> manage it in MQTT or have to add another layer of indirection - either
> in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs
> files, db) which will keep you unprocessed events.
>
> Now realizing, there is support for backpressure in v1.5.0 but I don't
> know if it could be exploited aka I don't know if it is possible to
> decouple event reading into memory and actual processing code in Spark
> which could be swapped on the fly. Probably not without some custom built
> facility for it.
>
> Petr
>
> On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak  wrote:
>
>> I should read my posts at least once to avoid so many typos. Hopefully
>> you are brave enough to read through.
>>
>> Petr
>>
>> On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
>> wrote:
>>
>>> I think you would have to persist events somehow if you don't want to
>>> miss them. I don't see any other option there. Either in MQTT if it is
>>> supported there or routing them through Kafka.
>>>
>>> There is WriteAheadLog in Spark but you would have decouple stream MQTT
>>> reading and processing into 2 separate job so that you could upgrade the
>>> processing one assuming the reading one would be stable (without changes)
>>> across versions. But it is problematic because there is no easy way how to
>>> share DStreams between jobs - you would have develop your own facility for
>>> it.
>>>
>>> Alternatively the reading job could could save MQTT event in its the
>>> most raw form into files - to limit need to change code - and then the
>>> processing job would work on top of it using Spark streaming based on
>>> files. I this is inefficient and can get quite complex if you would like to
>>> make it reliable.
>>>
>>> Basically either MQTT supports prsistence (which I don't know) or there
>>> is Kafka for these use case.
>>>
>>> Another option would be I think to place observable streams in between
>>> MQTT and Spark streaming with bakcpressure as far as you could perform
>>> upgrade till buffers fills up.
>>>
>>> I'm sorry that it is not thought out well from my side, it is just a
>>> brainstorm but it might lead you somewhere.
>>>
>>> Regards,
>>> Petr
>>>
>>> On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele <
>>> gangele...@gmail.com> wrote:
>>>
 Hi All,

 I have an spark streaming application with batch (10 ms) which is
 reading the MQTT channel and dumping the data from MQTT to HDFS.

 So suppose if I have to deploy new application jar(with changes in
 spark streaming application) what is the best way to deploy, currently I am
 doing as below

 1.killing the running streaming app using yarn application -kill ID
 2. and then starting the application again

 Problem with above approach is since we are not persisting the events
 in MQTT we will miss the events for the period of deploy.

 how to handle this case?

 regards
 jeeetndra

>>>


Re: Lost leader exception in Kafka Direct for Streaming

2015-10-01 Thread Adrian Tanase
This also happened to me in extreme recovery scenarios – e.g. Killing 4 out of 
a  7 machine cluster.

I’d put my money on recovering from an out of sync replica, although I haven’t 
done extensive testing around it.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:18 PM
To: swetha
Cc: "user@spark.apache.org"
Subject: Re: Lost leader exception in Kafka Direct for Streaming

Did you check you kafka broker logs to see what was going on during that time?

The direct stream will handle normal leader loss / rebalance by retrying tasks.

But the exception you got indicates that something with kafka was wrong, such 
that offsets were being re-used.

ie. your job already processed up through beginning offset 15027734702

but when asking kafka for the highest available offsets, it returns ending 
offset 15027725493

which is lower, in other words kafka lost messages.  This might happen because 
you lost a leader and recovered from a replica that wasn't in sync, or someone 
manually screwed up a topic, or ... ?

If you really want to just blindly "recover" from this situation (even though 
something is probably wrong with your data), the most straightforward thing to 
do is monitor and restart your job.




On Wed, Sep 30, 2015 at 4:31 PM, swetha 
> wrote:

Hi,

I see this sometimes in our Kafka Direct approach in our Streaming job. How
do we make sure that the job recovers from such errors and works normally
thereafter?

15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
19,  sleeping for 200ms
15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
5,  sleeping for 200ms

Followed by every task failing with something like this:

15/09/30 05:26:20 ERROR Executor: Exception in task 4.0 in stage 84281.0
(TID 818804)
kafka.common.NotLeaderForPartitionException

And:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
in stage 84958.0 failed 4 times, most recent failure: Lost task 15.3 in
stage 84958.0 (TID 819461, 10.227.68.102): java.lang.AssertionError:
assertion failed: Beginning offset 15027734702 is after the ending offset
15027725493 for topic hubble_stream partition 12. You either provided an
invalid fromOffset, or the Kafka topic has been damaged


Thanks,
Swetha




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lost-leader-exception-in-Kafka-Direct-for-Streaming-tp24891.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: How to connect HadoopHA from spark

2015-10-01 Thread Ted Yu
Have you setup HADOOP_CONF_DIR in spark-env.sh correctly ?

Cheers

On Thu, Oct 1, 2015 at 5:22 AM, Vinoth Sankar  wrote:

> Hi,
>
> How do i connect HadoopHA from SPARK. I tried overwriting hadoop
> configurations from sparkCong. But Still I'm getting UnknownHostException
> with following trace
>
> java.lang.IllegalArgumentException: java.net.UnknownHostException: ABC at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:240)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:144)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:579) at
> org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:524) at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) at
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) at
> org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1521) at
> org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1528) at
> org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:747) at
> org.apache.spark.deploy.master.Master.removeApplication(Master.scala:710)
> at
> org.apache.spark.deploy.master.Master.finishApplication(Master.scala:688)
> at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:432)
> at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:432)
> at scala.Option.foreach(Option.scala:236) at
> org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1.applyOrElse(Master.scala:432)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at
> org.apache.spark.deploy.master.Master.aroundReceive(Master.scala:52) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at
> akka.actor.ActorCell.invoke(ActorCell.scala:487) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at
> akka.dispatch.Mailbox.run(Mailbox.scala:220) at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.UnknownHostException: ABC ... 38 more
>
>
> Regards
>
> Vinoth Sankar
>
>
>


Decision Tree Model

2015-10-01 Thread hishamm
Hi,

I am using SPARK 1.4.0, Python and Decision Trees to perform machine
learning classification. 

I test it by creating the predictions and zip it to the test data, as
following: 


*predictions = tree_model.predict(test_data.map(lambda a: a.features))
labels = test_data.map(lambda a: a.label).zip(predictions)
correct = 100 * (labels.filter(lambda (v, p): v == p).count() /
float(test_data.count()))*

I always get this error in the zipping phase:

*Can not deserialize RDD with different number of items in pair: (3, 2)*


To avoid zipping, I tried to do it in a different way, as follows:

*labels = test_data.map(lambda a: (a.label, tree_model.predict(a.features)))
correct = 100 * (labels.filter(lambda (v, p): v == p).count() /
float(test_data.count()))*

However, I always get this error:

*in __getnewargs__(self)
250 # This method is called when attempting to pickle
SparkContext, which is always an error:
251 raise Exception(
--> 252 "It appears that you are attempting to reference
SparkContext from a broadcast "
253 "variable, action, or transforamtion. SparkContext can
only be used on the driver, "
254 "not in code that it run on workers. For more
information, see SPARK-5063."

Exception: It appears that you are attempting to reference SparkContext from
a broadcast variable, action, or transforamtion. SparkContext can only be
used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.*


Is the DecisionTreeModel part of the SparkContext ?!  
I found that using Scala, we can apply the second approach with no problem. 


So, how can I solve the two problems ?

Thanks and Regards,
Hisham












--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-Tree-Model-tp24899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: automatic start of streaming job on failure on YARN

2015-10-01 Thread Adrian Tanase
This happens automatically as long as you submit with cluster mode instead of 
client mode. (e.g. ./spark-submit —master yarn-cluster …)

The property you mention would help right after that, although you will need to 
set it to a large value (e.g. 1000?) - as there is no “infinite” support.

-adrian

From: Jeetendra Gangele
Date: Thursday, October 1, 2015 at 4:30 PM
To: user
Subject: automatic start of streaming job on failure on YARN


We've a streaming application running on yarn and we would like to ensure that 
is up running 24/7.

Is there a way to tell yarn to automatically restart a specific application on 
failure?

There is property yarn.resourcemanager.am.max-attempts which is default set to 
2 setting it to bigger value is the solution? Also I did observed this does not 
seems to work my application is failing and not starting automatically.

Mesos has this build in support wondering why yarn is lacking here?



Regards

jeetendra


Re: Pyspark: "Error: No main class set in JAR; please specify one with --class"

2015-10-01 Thread Marcelo Vanzin
How are you running the actual application?

I find it slightly odd that you're setting PYSPARK_SUBMIT_ARGS
directly; that's supposed to be an internal env variable used by
Spark. You'd normally pass those parameters in the spark-submit (or
pyspark) command line.

On Thu, Oct 1, 2015 at 8:56 AM, YaoPau  wrote:
> I'm trying to add multiple SerDe jars to my pyspark session.
>
> I got the first one working by changing my PYSPARK_SUBMIT_ARGS to:
>
> "--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
> 3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar"
>
> But when I tried to add a second, using:
>
> "--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
> 3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar,
> /home/me/jars/json-serde-1.3-jar-with-dependencies.jar"
>
> I got the error "Error: No main class set in JAR; please specify one with
> --class".
>
> How do I specify the class for just the second JAR?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-No-main-class-set-in-JAR-please-specify-one-with-class-tp24900.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark: "Error: No main class set in JAR; please specify one with --class"

2015-10-01 Thread Ted Yu
In your second command, have you tried changing the comma to colon ?

Cheers

On Thu, Oct 1, 2015 at 8:56 AM, YaoPau  wrote:

> I'm trying to add multiple SerDe jars to my pyspark session.
>
> I got the first one working by changing my PYSPARK_SUBMIT_ARGS to:
>
> "--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
> 3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar"
>
> But when I tried to add a second, using:
>
> "--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
> 3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar,
> /home/me/jars/json-serde-1.3-jar-with-dependencies.jar"
>
> I got the error "Error: No main class set in JAR; please specify one with
> --class".
>
> How do I specify the class for just the second JAR?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-No-main-class-set-in-JAR-please-specify-one-with-class-tp24900.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Pyspark: "Error: No main class set in JAR; please specify one with --class"

2015-10-01 Thread YaoPau
I'm trying to add multiple SerDe jars to my pyspark session.

I got the first one working by changing my PYSPARK_SUBMIT_ARGS to:

"--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar"

But when I tried to add a second, using: 

"--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar,
/home/me/jars/json-serde-1.3-jar-with-dependencies.jar"

I got the error "Error: No main class set in JAR; please specify one with
--class".

How do I specify the class for just the second JAR?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-No-main-class-set-in-JAR-please-specify-one-with-class-tp24900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting spark application driver ID programmatically

2015-10-01 Thread Snehal Nagmote
Hi ,

I have use case where we need to automate start/stop of spark streaming
application.

To stop spark job, we need driver/application id of the job .

For example :

/app/spark-master/bin/spark-class org.apache.spark.deploy.Client kill
spark://10.65.169.242:7077 $driver_id

I am thinking to get the driver id when we submit the job in verbose mode ,
by parsing the output .

Does spark provide any api where it provides driver id of application .

Is there any better or cleaner way to get driver ID ?


Any suggestions would be helpful  ,

Thanks,
Snehal


OOM error in Spark worker

2015-10-01 Thread varun sharma
My workers are going OOM over time. I am running a streaming job in spark
1.4.0.
Here is the heap dump of workers.







*16,802 instances of "org.apache.spark.deploy.worker.ExecutorRunner",
loaded by "sun.misc.Launcher$AppClassLoader @ 0xdff94088" occupy
488,249,688 (95.80%) bytes. These instances are referenced from one
instance of "java.lang.Object[]", loaded by "" Keywords org.apache.spark.deploy.worker.ExecutorRunner
java.lang.Object[] sun.misc.Launcher$AppClassLoader
@ 0xdff94088 *
is this because of this bug:
http://apache-spark-developers-list.1001551.n3.nabble.com/Worker-memory-leaks-td13341.html
https://issues.apache.org/jira/browse/SPARK-9202

Also,
I am getting below error continuously if one of the worker/executor dies on
any node in my spark cluster.
If I start the worker also, error doesn't go. I have to force_kill my
streaming job and restart to fix the issue. Is it some bug?
I am using Spark 1.4.0.


*MY_IP in logs is IP of worker node which failed. *
























*15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194218 -
Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]} akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
  at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
  at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
  at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745) 15/09/03 11:29:11 WARN
BlockManagerMaster: Failed to remove RDD 194217 - Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]} akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
  at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
  at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
  at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745) 15/09/03 11:29:11 ERROR
SparkDeploySchedulerBackend: Asked to remove non-existent executor
16723 15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD
194216 - Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]} *

*It is easily reproducible if I manually stop a worker on one of my node. *


*15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 329 15/09/03 23:52:18 ERROR
SparkDeploySchedulerBackend: Asked to remove non-existent executor
333 15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 334 *


*It doesn't go even if I start the worker again. Follow up question: If my
streaming job has consumed some events from Kafka topic and are pending to
be scheduled because of delay in processing... Will my force killing the
streaming job lose that data which is not yet scheduled? *


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


How to access lost executor log file

2015-10-01 Thread Lan Jiang
Hi, there

When running a Spark job on YARN, 2 executors somehow got lost during the 
execution. The message on the history server GUI is “CANNOT find address”.  Two 
extra executors were launched by YARN and eventually finished the job. Usually 
I go to the “Executors” tab on the UI to check the executor stdout/stderr for 
troubleshoot. Now if I go to the “Executors” tab,  I do not see the 2 executors 
that were lost. I can only see the rest executors and the 2 new executors. Thus 
I cannot check the stdout/stderr of the lost executors. How can I access the 
log files of these lost executors to find out why they were lost?

Thanks

Lan







Accumulator of rows?

2015-10-01 Thread Saif.A.Ellafi
Hi all,

I need to repeat a couple rows from a dataframe by n times each. To do so, I 
plan to create a new Data Frame, but I am being unable to find a way to 
accumulate "Rows" somewhere, as this might get huge, I can't accumulate into a 
mutable Array, I think?.

Thanks,
Saif



Re: How to access lost executor log file

2015-10-01 Thread Lan Jiang
Ted,

Thanks for your reply.

First of all, after sending email to the mailing list,  I use yarn logs
applicationId  to retrieve the aggregated log
successfully.  I found the exceptions I am looking for.

Now as to your suggestion, when I go to the YARN RM UI, I can only see the
"Tracking URL" in the application overview section. When I click it, it
brings me to the spark history server UI, where I cannot find the lost
exectuors. The only logs link I can find one the YARN RM site is the
ApplicationMaster log, which is not what I need. Did I miss something?

Lan

On Thu, Oct 1, 2015 at 1:30 PM, Ted Yu  wrote:

> Can you go to YARN RM UI to find all the attempts for this Spark Job ?
>
> The two lost executors should be found there.
>
> On Thu, Oct 1, 2015 at 10:30 AM, Lan Jiang  wrote:
>
>> Hi, there
>>
>> When running a Spark job on YARN, 2 executors somehow got lost during the
>> execution. The message on the history server GUI is “CANNOT find address”.
>> Two extra executors were launched by YARN and eventually finished the job.
>> Usually I go to the “Executors” tab on the UI to check the executor
>> stdout/stderr for troubleshoot. Now if I go to the “Executors” tab,  I do
>> not see the 2 executors that were lost. I can only see the rest executors
>> and the 2 new executors. Thus I cannot check the stdout/stderr of the lost
>> executors. How can I access the log files of these lost executors to find
>> out why they were lost?
>>
>> Thanks
>>
>> Lan
>>
>>
>>
>>
>>
>>
>


Re: How to access lost executor log file

2015-10-01 Thread Ted Yu
Can you go to YARN RM UI to find all the attempts for this Spark Job ?

The two lost executors should be found there.

On Thu, Oct 1, 2015 at 10:30 AM, Lan Jiang  wrote:

> Hi, there
>
> When running a Spark job on YARN, 2 executors somehow got lost during the
> execution. The message on the history server GUI is “CANNOT find address”.
> Two extra executors were launched by YARN and eventually finished the job.
> Usually I go to the “Executors” tab on the UI to check the executor
> stdout/stderr for troubleshoot. Now if I go to the “Executors” tab,  I do
> not see the 2 executors that were lost. I can only see the rest executors
> and the 2 new executors. Thus I cannot check the stdout/stderr of the lost
> executors. How can I access the log files of these lost executors to find
> out why they were lost?
>
> Thanks
>
> Lan
>
>
>
>
>
>


Re: Problem understanding spark word count execution

2015-10-01 Thread Kartik Mathur
Thanks Nicolae ,
So In my case all executers are sending results back to the driver and and "
*shuffle* *is just sending out the textFile to distribute the
partitions", *could
you please elaborate on this  ? what exactly is in this file ?

On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> --
> *From:* Kartik Mathur 
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>


"java.io.IOException: Filesystem closed" on executors

2015-10-01 Thread Lan Jiang
Hi, there

Here is the problem I ran into when executing a Spark Job (Spark 1.3). The
spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0
library. Then it does some filter/map transformation, repartition to 1
partition and then write to HDFS. It creates 2 stages. The total HDFS block
number is around 12000, thus it creates 12000 partitions, thus 12000 tasks
for the first stage. I have total 9 executors launched with 5 thread for
each. The job has run fine until the very end.  When it reaches 19980/2
tasks succeeded, it suddenly failed the last 20 tasks and I lost 2
executors. The spark did launched 2 new executors and finishes the job
eventually by reprocessing the 20 tasks.

I only ran into this issue when I run the spark application on the full
dataset. When I run the 1/3 of the dataset, everything finishes fine
without error.

Question 1: What is the root cause of this issue? It is simiar to
http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
and https://issues.apache.org/jira/browse/SPARK-3052, but it says the issue
has been fixed since 1.2
Quesiton 2: I am a little surprised that after the 2 new executors were
launched,  replacing the two failed executors, they simply reprocessed the
failed 20 tasks/partitions.  What about the results for other parititons
processed by the 2 failed executors before? I assumed the results of these
parititons are stored to the local disk and thus do not need to be computed
by the new exectuors?  When are the data stored locally? Is it
configuration? This question is for my own understanding about the spark
framework.

The exception causing the exectuor failure is below

org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.avro.mapred.FsInput.read(FsInput.java:54)
at
org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
at
org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839)
at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444)
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264)


Re: Kafka Direct Stream

2015-10-01 Thread Nicolae Marasoiu
Hi,


If you just need processing per topic, why not generate N different kafka 
direct streams ? when creating a kafka direct stream you have list of topics - 
just give one.


Then the reusable part of your computations should be extractable as 
transformations/functions and reused between the streams.


Nicu



From: Adrian Tanase 
Sent: Thursday, October 1, 2015 5:47 PM
To: Cody Koeninger; Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

On top of that you could make the topic part of the key (e.g. keyBy in 
.transform or manually emitting a tuple) and use one of the .xxxByKey operators 
for the processing.

If you have a stable, domain specific list of topics (e.g. 3-5 named topics) 
and the processing is really different, I would also look at filtering by topic 
and saving as different Dstreams in your code.

Either way you need to start with Cody's tip in order to extract the topic name.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:06 PM
To: Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

You can get the topic for a given partition from the offset range.  You can 
either filter using that; or just have a single rdd and match on topic when 
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
[http://spark.apache.org/docs/latest/img/spark-logo-hd.png]

Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ...
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.
Read 
more...




On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
> wrote:
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I am 
able to consume fine but I am stuck at how to separate the data for each topic 
since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each 
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit



spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-01 Thread Sourabh Chandak
Hi,

I am writing a spark streaming job using the direct stream method for kafka
and wanted to handle the case of checkpoint failure when we'll have to
reprocess the entire data from starting. By default for every new
checkpoint it tries to load everything from each partition and that takes a
lot of time for processing. After some searching found out that there
exists a config spark.streaming.kafka.maxRatePerPartition which can be used
to tackle this. My question is what will be a suitable range for this
config if we have ~12 million messages in kafka with maximum message size
~10 MB.

Thanks,
Sourabh


Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-01 Thread Cody Koeninger
That depends on your job, your cluster resources, the number of seconds per
batch...

You'll need to do some empirical work to figure out how many messages per
batch a given executor can handle.  Divide that by the number of seconds
per batch.



On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak 
wrote:

> Hi,
>
> I am writing a spark streaming job using the direct stream method for
> kafka and wanted to handle the case of checkpoint failure when we'll have
> to reprocess the entire data from starting. By default for every new
> checkpoint it tries to load everything from each partition and that takes a
> lot of time for processing. After some searching found out that there
> exists a config spark.streaming.kafka.maxRatePerPartition which can be used
> to tackle this. My question is what will be a suitable range for this
> config if we have ~12 million messages in kafka with maximum message size
> ~10 MB.
>
> Thanks,
> Sourabh
>


python version in spark-submit

2015-10-01 Thread roy
Hi,

 We have python2.6 (default) on cluster and also we have installed
python2.7.

I was looking a way to set python version in spark-submit.

anyone know how to do this ?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/python-version-in-spark-submit-tp24902.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: python version in spark-submit

2015-10-01 Thread Ted Yu
PYSPARK_PYTHON determines what the worker uses.

PYSPARK_DRIVER_PYTHON is for driver.

See the comment at the beginning of bin/pyspark

FYI

On Thu, Oct 1, 2015 at 1:56 PM, roy  wrote:

> Hi,
>
>  We have python2.6 (default) on cluster and also we have installed
> python2.7.
>
> I was looking a way to set python version in spark-submit.
>
> anyone know how to do this ?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/python-version-in-spark-submit-tp24902.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Hive permanent functions are not available in Spark SQL

2015-10-01 Thread Yin Huai
Hi Pala,

Can you add the full stacktrace of the exception? For now, can you use
create temporary function to workaround the issue?

Thanks,

Yin

On Wed, Sep 30, 2015 at 11:01 AM, Pala M Muthaia <
mchett...@rocketfuelinc.com.invalid> wrote:

> +user list
>
> On Tue, Sep 29, 2015 at 3:43 PM, Pala M Muthaia <
> mchett...@rocketfuelinc.com> wrote:
>
>> Hi,
>>
>> I am trying to use internal UDFs that we have added as permanent
>> functions to Hive, from within Spark SQL query (using HiveContext), but i
>> encounter NoSuchObjectException, i.e. the function could not be found.
>>
>> However, if i execute 'show functions' command in spark SQL, the
>> permanent functions appear in the list.
>>
>> I am using Spark 1.4.1 with Hive 0.13.1. I tried to debug this by looking
>> at the log and code, but it seems both the show functions command as well
>> as udf query both go through essentially the same code path, but the former
>> can see the UDF but the latter can't.
>>
>> Any ideas on how to debug/fix this?
>>
>>
>> Thanks,
>> pala
>>
>
>


Shuffle Write v/s Shuffle Read

2015-10-01 Thread Kartik Mathur
Hi

I am trying to better understand shuffle in spark .

Based on my understanding thus far ,

*Shuffle Write* : writes stage output for intermediate stage on local disk
if memory is not sufficient.,
Example , if each worker has 200 MB memory for intermediate results and the
results are 300MB then , each executer* will keep 200 MB in memory and will
write remaining 100 MB on local disk .  *

*Shuffle Read : *Each executer will read from other executer's *memory +
disk , so total read in above case will be 300(200 from memory and 100 from
disk)*num of executers ? *

Is my understanding correct ?

Thanks,
Kartik


Java REST custom receiver

2015-10-01 Thread Pavol Loffay
Hello,

is it possible to implement custom receiver [1] which will receive messages
from REST calls?

As REST classes in Java(jax-rs) are defined declarative and instantiated by
application server I'm not use if it is possible.

I have tried to implement custom receiver which is inject to REST class via
CDI and then is passed  to JavaStreamingContext. But there is problem
receiver instance in REST class is not the same as in SparkContext
(supervisor is null).


Could anyone help me with this? I'm also using JMS in my app so data from
REST can be sent to JMS and then received by spark JMS receiver. But I
think there should be more straight forward solution.




[1]: https://spark.apache.org/docs/latest/api/java/


Re: Problem understanding spark word count execution

2015-10-01 Thread Kartik Mathur
Hi Nicolae,
Thanks for the reply. To further clarify things -

sc.textFile is reading from HDFS, now shouldn't the file be read in a way
such that EACH executer works on only the local copy of file part available
, in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
partitions will be created and each task will run on  1 partition (which is
what I am seeing in the stages logs) , also i assume it will read the file
in a way that each executer will have exactly same amount of data. so there
shouldn't be any shuffling in reading atleast.

During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
the output I am seeing

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
(hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
I have following questions -

1) What exactly is 2.7KB of shuffle write  ?
2) is this 2.7 KB of shuffle write is local to that executer ?
3) In the executers log I am seeing 2000 bytes results sent to the driver ,
if instead this number is much much greater than 2000 byes such that it
does not fit in executer's memory , will shuffle write increase ?
4)For word count 256 MB data is substantial amount text , how come the
result for this stage is only 2000 bytes !! it should send everyword with
respective count , for a 256 MB input this result should be much bigger ?

I hope I am clear this time.

Hope to get a reply,

Thanks
Kartik



On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> --
> *From:* Kartik Mathur 
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>>
>> Hi,
>>
>> 2- the end results are sent back to the driver; the shuffles are
>> transmission of intermediate results between nodes such as the -> which are
>> all intermediate transformations.
>>
>> More precisely, since flatMap and map are narrow dependencies, meaning
>> they can usually happen on the local node, I bet shuffle is just sending
>> out the textFile to a few nodes to distribute the partitions.
>>
>>
>> --
>> *From:* Kartik Mathur 
>> *Sent:* Thursday, October 1, 2015 12:42 AM
>> *To:* user
>> *Subject:* Problem understanding spark word count execution
>>
>> Hi All,
>>
>> I tried running spark word count and I have couple of questions -
>>
>> I am analyzing stage 0 , i.e
>>  *sc.textFile -> flatMap -> Map (Word count example)*
>>
>> 1) In the *Stage logs* under Application UI details for every task I am
>> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
>> this task write ? like how many bytes to which executer ?*
>>
>> 2) In the executer's log when I look for same task it says 2000 bytes of
>> result is sent to driver , my question is , *if the results were
>> directly sent to driver what is this shuffle write ? *
>>
>> Thanks,
>> Kartik
>>
>
>


Re: Hive permanent functions are not available in Spark SQL

2015-10-01 Thread Pala M Muthaia
Thanks for getting back Yin. I have copied the stack below. The associated
query is just this: "hc.sql("select murmurhash3('abc') from dual")". The
UDF murmurhash3 is already available in our hive metastore.

Regarding temporary function, can i create a temp function with existing
Hive UDF code, not arbitrary scala code?

Thanks.

-
15/10/01 16:59:59 ERROR metastore.RetryingHMSHandler:
MetaException(message:NoSuchObjectException(message:Function
default.murmurhash3 does not exist))
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newMetaException(HiveMetaStore.java:4613)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_function(HiveMetaStore.java:4740)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
at com.sun.proxy.$Proxy30.get_function(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunction(HiveMetaStoreClient.java:1721)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy31.getFunction(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getFunction(Hive.java:2662)
at
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfoFromMetastore(FunctionRegistry.java:546)
at
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getQualifiedFunctionInfo(FunctionRegistry.java:579)
at
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:645)
at
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at
org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon$3.org
$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at
org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at
org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
at
org.apache.spark.sql.hive.HiveContext$$anon$3.lookupFunction(HiveContext.scala:376)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:465)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:463)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org
$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:75)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:90)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at

SparkSQL: Reading data from hdfs and storing into multiple paths

2015-10-01 Thread haridass saisriram
Hi,

  I am trying to find a simple example to read a data file on HDFS. The
file has the following format
a , b  , c ,,mm
a1,b1,c1,2015,09
a2,b2,c2,2014,08


I would like to read this file and store it in HDFS partitioned by year and
month. Something like this
/path/to/hdfs//mm

I want to specify the "/path/to/hdfs/" and /mm should be populated
automatically based on those columns. Could some one point me in the right
direction

Thank you,
Sri Ram


Re: Problem understanding spark word count execution

2015-10-01 Thread Nicolae Marasoiu
Hi,

So you say " sc.textFile -> flatMap -> Map".

My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).

Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the partitions: for instance 
in HDFS they would do an open and a seek I guess and just read from the stream 
there, convert to whatever the InputFormat dictates.

The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
remote nodes which do have replicas of that data.

After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).

Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.

Nicu

From: Kartik Mathur 
Sent: Thursday, October 1, 2015 10:25 PM
To: Nicolae Marasoiu
Cc: user
Subject: Re: Problem understanding spark word count execution

Thanks Nicolae ,
So In my case all executers are sending results back to the driver and and 
"shuffle is just sending out the textFile to distribute the partitions", could 
you please elaborate on this  ? what exactly is in this file ?


On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu 
> wrote:


Hi,

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.



From: Kartik Mathur >
Sent: Thursday, October 1, 2015 12:42 AM
To: user
Subject: Problem understanding spark word count execution

Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB, question - how can I know where all did this task 
write ? like how many bytes to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is , if the results were directly sent to 
driver what is this shuffle write ?

Thanks,
Kartik



Re: Call Site - Spark Context

2015-10-01 Thread Sandip Mehta
Thanks Robin. 

Regards
SM
> On 01-Oct-2015, at 3:15 pm, Robin East  wrote:
> 
> From the comments in the code:
> 
> When called inside a class in the spark package, returns the name of the user 
> code class (outside the spark package) that called into Spark, as well as 
> which Spark method they called. This is used, for example, to tell users 
> where in their code each RDD got created.
> Keep crawling up the stack trace until we find the first function not inside 
> of the spark package. We track the last (shallowest) contiguous Spark method. 
> This might be an RDD transformation, a SparkContext function (such as 
> parallelize), or anything else that leads to instantiation of an RDD. We also 
> track the first (deepest) user method, file, and line.
> So basically it’s a mechanism to report where in the user’s code an RDD is 
> created.
> ---
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action 
> 
> 
> 
> 
> 
> 
>> On 1 Oct 2015, at 23:06, Sandip Mehta > > wrote:
>> 
>> Hi,
>> 
>> I wanted to understand what is the purpose of Call Site in Spark Context?
>> 
>> Regards
>> SM
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
> 



Spark cluster - use machine name in WorkerID, not IP address

2015-10-01 Thread markluk
I'm running a standalone Spark cluster of 1 master and 2 slaves.

My slaves file under /conf list the fully qualified domain name of the 2
slave machines

When I look on the Spark webpage ( on :8080), I see my 2 workers, but the
worker ID uses the IP address , like
worker-20151001153012-172.31.51.158-44699


 

That worker ID is not very human friendly. Is there a way to use the machine
name in the ID instead? like 
worker-20151001153012-node1-44699



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-cluster-use-machine-name-in-WorkerID-not-IP-address-tp24905.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to Set Retry Policy in Spark

2015-10-01 Thread Renxia Wang
Additional Info: I am running Spark on YARN.

2015-10-01 15:42 GMT-07:00 Renxia Wang :

> Hi guys,
>
> I know there is a way to set the number of retry of failed tasks, using
> spark.task.maxFailures. what is the default policy for the failed tasks
> retry? Is it exponential backoff? My tasks sometimes failed because of
> Socket connection timeout/reset, even with retry, some of the tasks will
> fail for more than spark.task.maxFailures times.
>
> Thanks,
>
> Zhique
>


Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-01 Thread Utkarsh Sengar
Not sure what you mean by that, I shared the data which I see in spark UI.
Can you point me to a location where I can precisely get the data you need?

When I run the job in fine grained mode, I see tons are tasks created and
destroyed under a mesos "framework". I have about 80k spark tasks which I
think translates directly to independent mesos tasks.
https://dl.dropboxusercontent.com/u/2432670/Screen%20Shot%202015-10-01%20at%204.14.34%20PM.png

When i run the job in coarse grained mode, I just see 1-4 tasks with 1-4
executors (it varies from what mesos allocates). And these mesos tasks try
to complete the 80k spark tasks and runs out of memory eventually (see the
stack track above) in the gist shared above.


On Thu, Oct 1, 2015 at 4:07 PM, Tim Chen  wrote:

> Hi Utkarsh,
>
> I replied earlier asking what is your task assignment like with fine vs
> coarse grain mode look like?
>
> Tim
>
> On Thu, Oct 1, 2015 at 4:05 PM, Utkarsh Sengar 
> wrote:
>
>> Bumping it up, its not really a blocking issue.
>> But fine grain mode eats up uncertain number of resources in mesos and
>> launches tons of tasks, so I would prefer using the coarse grained mode if
>> only it didn't run out of memory.
>>
>> Thanks,
>> -Utkarsh
>>
>> On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar 
>> wrote:
>>
>>> Hi Tim,
>>>
>>> 1. spark.mesos.coarse:false (fine grain mode)
>>> This is the data dump for config and executors assigned:
>>> https://gist.github.com/utkarsh2012/6401d5526feccab14687
>>>
>>> 2. spark.mesos.coarse:true (coarse grain mode)
>>> Dump for coarse mode:
>>> https://gist.github.com/utkarsh2012/918cf6f8ed5945627188
>>>
>>> As you can see, exactly the same code works fine in fine grained, goes
>>> out of memory in coarse grained mode. First an executor was lost and then
>>> the driver went out of memory.
>>> So I am trying to understand what is different in fine grained vs coarse
>>> mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
>>> spark is not managing memory in the same way.
>>>
>>> Thanks,
>>> -Utkarsh
>>>
>>>
>>> On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen  wrote:
>>>
 Hi Utkarsh,

 What is your job placement like when you run fine grain mode? You said
 coarse grain mode only ran with one node right?

 And when the job is running could you open the Spark webui and get
 stats about the heap size and other java settings?

 Tim

 On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar  wrote:

> Bumping this one up, any suggestions on the stacktrace?
> spark.mesos.coarse=true is not working and the driver crashed with the
> error.
>
> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar  > wrote:
>
>> Missed to do a reply-all.
>>
>> Tim,
>>
>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
>> works (sorry there was a typo in my last email, I meant "when I do
>> "spark.mesos.coarse=false", the job works like a charm. ").
>>
>> I get this exception with spark.mesos.coarse = true:
>>
>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>> "55af5a61e8a42806f47546c1"}
>>
>> 15/09/22
>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>> "55af5a61e8a42806f47546c1"}, max= null
>>
>> Exception
>> in thread "main" java.lang.OutOfMemoryError: Java heap space
>>
>> 
>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> 

Re: Java REST custom receiver

2015-10-01 Thread Silvio Fiorito
When you say “receive messages” you mean acting as a REST endpoint, right? If 
so, it might be better to use JMS (or Kafka) option for a few reasons:

The receiver will be deployed to any of the available executors, so your REST 
clients will need to be made aware of the IP where the receiver is running (or 
you have some other proxy doing that). In the event the app restarts or the 
receiver dies and is restarted the executor the receiver runs on may change. 
Now there’s an option to specify a preference for what hosts to run the 
receiver on by overriding “preferredLocation” if you still decide to go that 
route.

There’s a potential loss of messages when you are deploying or if your receiver 
or streaming app dies. You would now have to worry about managing those 
messages on the client’s side during a deployment or outage.

There’s also a security issue (may not be relevant in your case) in that you 
would be exposing your executors and cluster in order to receive these 
messages. Worst case would be if the clients are outside your enterprise 
network.

My preference would be to use JMS or Kafka or some other messaging systems as a 
buffer between the two systems.

Thanks,
Silvio

From: Pavol Loffay
Date: Thursday, October 1, 2015 at 3:58 PM
To: "user@spark.apache.org"
Subject: Java REST custom receiver

Hello,

is it possible to implement custom receiver [1] which will receive messages 
from REST calls?

As REST classes in Java(jax-rs) are defined declarative and instantiated by 
application server I'm not use if it is possible.

I have tried to implement custom receiver which is inject to REST class via CDI 
and then is passed  to JavaStreamingContext. But there is problem receiver 
instance in REST class is not the same as in SparkContext (supervisor is null).


Could anyone help me with this? I'm also using JMS in my app so data from REST 
can be sent to JMS and then received by spark JMS receiver. But I think there 
should be more straight forward solution.




[1]: https://spark.apache.org/docs/latest/api/java/


Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-01 Thread Utkarsh Sengar
Bumping it up, its not really a blocking issue.
But fine grain mode eats up uncertain number of resources in mesos and
launches tons of tasks, so I would prefer using the coarse grained mode if
only it didn't run out of memory.

Thanks,
-Utkarsh

On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar 
wrote:

> Hi Tim,
>
> 1. spark.mesos.coarse:false (fine grain mode)
> This is the data dump for config and executors assigned:
> https://gist.github.com/utkarsh2012/6401d5526feccab14687
>
> 2. spark.mesos.coarse:true (coarse grain mode)
> Dump for coarse mode:
> https://gist.github.com/utkarsh2012/918cf6f8ed5945627188
>
> As you can see, exactly the same code works fine in fine grained, goes out
> of memory in coarse grained mode. First an executor was lost and then the
> driver went out of memory.
> So I am trying to understand what is different in fine grained vs coarse
> mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
> spark is not managing memory in the same way.
>
> Thanks,
> -Utkarsh
>
>
> On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen  wrote:
>
>> Hi Utkarsh,
>>
>> What is your job placement like when you run fine grain mode? You said
>> coarse grain mode only ran with one node right?
>>
>> And when the job is running could you open the Spark webui and get stats
>> about the heap size and other java settings?
>>
>> Tim
>>
>> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar 
>> wrote:
>>
>>> Bumping this one up, any suggestions on the stacktrace?
>>> spark.mesos.coarse=true is not working and the driver crashed with the
>>> error.
>>>
>>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar 
>>> wrote:
>>>
 Missed to do a reply-all.

 Tim,

 spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
 works (sorry there was a typo in my last email, I meant "when I do
 "spark.mesos.coarse=false", the job works like a charm. ").

 I get this exception with spark.mesos.coarse = true:

 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
 "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
 "55af5a61e8a42806f47546c1"}

 15/09/22
 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
 "55af5a61e8a42806f47546c1"}, max= null

 Exception
 in thread "main" java.lang.OutOfMemoryError: Java heap space

 
 at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)

 
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 
 at scala.Option.getOrElse(Option.scala:120)

 
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 
 at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)

 
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 

Re: Standalone Scala Project

2015-10-01 Thread Robineast
I've eyeballed the sbt file and it look ok to me

Try 

sbt clean package

that should sort it out. If not please supply the full code you are running



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-Scala-Project-tp24892p24904.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Call Site - Spark Context

2015-10-01 Thread Sandip Mehta
Thanks Robin. 

Regards
SM
> On 01-Oct-2015, at 3:15 pm, Robin East  wrote:
> 
> From the comments in the code:
> 
> When called inside a class in the spark package, returns the name of the user 
> code class (outside the spark package) that called into Spark, as well as 
> which Spark method they called. This is used, for example, to tell users 
> where in their code each RDD got created.
> Keep crawling up the stack trace until we find the first function not inside 
> of the spark package. We track the last (shallowest) contiguous Spark method. 
> This might be an RDD transformation, a SparkContext function (such as 
> parallelize), or anything else that leads to instantiation of an RDD. We also 
> track the first (deepest) user method, file, and line.
> So basically it’s a mechanism to report where in the user’s code an RDD is 
> created.
> ---
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action 
> 
> 
> 
> 
> 
> 
>> On 1 Oct 2015, at 23:06, Sandip Mehta > > wrote:
>> 
>> Hi,
>> 
>> I wanted to understand what is the purpose of Call Site in Spark Context?
>> 
>> Regards
>> SM
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
> 



Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-01 Thread Tim Chen
Hi Utkarsh,

I replied earlier asking what is your task assignment like with fine vs
coarse grain mode look like?

Tim

On Thu, Oct 1, 2015 at 4:05 PM, Utkarsh Sengar 
wrote:

> Bumping it up, its not really a blocking issue.
> But fine grain mode eats up uncertain number of resources in mesos and
> launches tons of tasks, so I would prefer using the coarse grained mode if
> only it didn't run out of memory.
>
> Thanks,
> -Utkarsh
>
> On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar 
> wrote:
>
>> Hi Tim,
>>
>> 1. spark.mesos.coarse:false (fine grain mode)
>> This is the data dump for config and executors assigned:
>> https://gist.github.com/utkarsh2012/6401d5526feccab14687
>>
>> 2. spark.mesos.coarse:true (coarse grain mode)
>> Dump for coarse mode:
>> https://gist.github.com/utkarsh2012/918cf6f8ed5945627188
>>
>> As you can see, exactly the same code works fine in fine grained, goes
>> out of memory in coarse grained mode. First an executor was lost and then
>> the driver went out of memory.
>> So I am trying to understand what is different in fine grained vs coarse
>> mode other than allocation of multiple mesos tasks vs 1 mesos task. Clearly
>> spark is not managing memory in the same way.
>>
>> Thanks,
>> -Utkarsh
>>
>>
>> On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen  wrote:
>>
>>> Hi Utkarsh,
>>>
>>> What is your job placement like when you run fine grain mode? You said
>>> coarse grain mode only ran with one node right?
>>>
>>> And when the job is running could you open the Spark webui and get stats
>>> about the heap size and other java settings?
>>>
>>> Tim
>>>
>>> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar 
>>> wrote:
>>>
 Bumping this one up, any suggestions on the stacktrace?
 spark.mesos.coarse=true is not working and the driver crashed with the
 error.

 On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar 
 wrote:

> Missed to do a reply-all.
>
> Tim,
>
> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
> works (sorry there was a typo in my last email, I meant "when I do
> "spark.mesos.coarse=false", the job works like a charm. ").
>
> I get this exception with spark.mesos.coarse = true:
>
> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
> "55af5a61e8a42806f47546c1"}
>
> 15/09/22
> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
> "55af5a61e8a42806f47546c1"}, max= null
>
> Exception
> in thread "main" java.lang.OutOfMemoryError: Java heap space
>
> 
> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>
> 
> at scala.Option.getOrElse(Option.scala:120)
>
> 
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>
> 
> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>
> 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>
> 

Call Site - Spark Context

2015-10-01 Thread Sandip Mehta
Hi,

I wanted to understand what is the purpose of Call Site in Spark Context?

Regards
SM

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to save DataFrame as a Table in Hbase?

2015-10-01 Thread unk1102
Hi anybody tried to save DataFrame in HBase? I have processed data in
DataFrame which I need to store in HBase so that my web ui can access it
from Hbase? Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-DataFrame-as-a-Table-in-Hbase-tp24903.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Hive permanent functions are not available in Spark SQL

2015-10-01 Thread Yin Huai
Yes. You can use create temporary function to create a function based on a
Hive UDF (
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/ReloadFunction
).

Regarding the error, I think the problem is that starting from Spark 1.4,
we have two separate Hive client, one for metastore side and one for
execution side. For your case, the function's metadata is stored in
metastore. But, when we do function look up, we are looking at the Hive in
execution side.  Can you file a jira?

On Thu, Oct 1, 2015 at 2:04 PM, Pala M Muthaia 
wrote:

> Thanks for getting back Yin. I have copied the stack below. The associated
> query is just this: "hc.sql("select murmurhash3('abc') from dual")". The
> UDF murmurhash3 is already available in our hive metastore.
>
> Regarding temporary function, can i create a temp function with existing
> Hive UDF code, not arbitrary scala code?
>
> Thanks.
>
> -
> 15/10/01 16:59:59 ERROR metastore.RetryingHMSHandler:
> MetaException(message:NoSuchObjectException(message:Function
> default.murmurhash3 does not exist))
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newMetaException(HiveMetaStore.java:4613)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_function(HiveMetaStore.java:4740)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
> at com.sun.proxy.$Proxy30.get_function(Unknown Source)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunction(HiveMetaStoreClient.java:1721)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
> at com.sun.proxy.$Proxy31.getFunction(Unknown Source)
> at org.apache.hadoop.hive.ql.metadata.Hive.getFunction(Hive.java:2662)
> at
> org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfoFromMetastore(FunctionRegistry.java:546)
> at
> org.apache.hadoop.hive.ql.exec.FunctionRegistry.getQualifiedFunctionInfo(FunctionRegistry.java:579)
> at
> org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:645)
> at
> org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
> at org.apache.spark.sql.hive.HiveContext$$anon$3.org
> $apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
> at
> org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
> at
> org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
> at
> org.apache.spark.sql.hive.HiveContext$$anon$3.lookupFunction(HiveContext.scala:376)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:465)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$13$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:463)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at 

How to Set Retry Policy in Spark

2015-10-01 Thread Renxia Wang
Hi guys,

I know there is a way to set the number of retry of failed tasks, using
spark.task.maxFailures. what is the default policy for the failed tasks
retry? Is it exponential backoff? My tasks sometimes failed because of
Socket connection timeout/reset, even with retry, some of the tasks will
fail for more than spark.task.maxFailures times.

Thanks,

Zhique


Re: Worker node timeout exception

2015-10-01 Thread Mark Luk
Here is the log file from the worker node

15/09/30 23:49:37 INFO Worker: Executor app-20150930233113-/8 finished
with state EXITED message Command exited with code 1 exitStatus \
1
15/09/30 23:49:37 INFO Worker: Asked to launch executor
app-20150930233113-/9 for PythonPi
15/09/30 23:49:37 INFO SecurityManager: Changing view acls to: juser
15/09/30 23:49:37 INFO SecurityManager: Changing modify acls to: juser
15/09/30 23:49:37 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(juser)\
; users with modify permissions: Set(juser)
15/09/30 23:49:37 INFO ExecutorRunner: Launch command:
"/usr/lib/jvm/java-8-oracle/jre/bin/java" "-cp"
"/juser/press-mgmt/spark-1.5.0-bin-h\
adoop2.6/sbin/../conf/:/juser/press-mgmt/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar:/juser/press-mgmt/spark-1.5.0-b\
in-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/juser/press-mgmt/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/juser/press-mgm\
t/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar" "-Xms100M"
"-Xmx100M" "-Dspark.driver.port=36363" "org.apache.spark.executor.C\
oarseGrainedExecutorBackend" "--driver-url" "akka.tcp://
sparkDriver@172.31.61.43:36363/user/CoarseGrainedScheduler" "--executor-id"
"9" "--\
hostname" "172.31.51.246" "--cores" "2" "--app-id"
"app-20150930233113-" "--worker-url" "akka.tcp://
sparkWorker@172.31.51.246:41893/use\
r/Worker"
15/09/30 23:51:40 INFO Worker: Executor app-20150930233113-/9 finished
with state EXITED message Command exited with code 1 exitStatus \
1
15/09/30 23:51:40 INFO Worker: Asked to launch executor
app-20150930233113-/10 for PythonPi
15/09/30 23:51:40 INFO SecurityManager: Changing view acls to: juser
15/09/30 23:51:40 INFO SecurityManager: Changing modify acls to: juser
15/09/30 23:51:40 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(juser)\
; users with modify permissions: Set(juser)
15/09/30 23:51:40 INFO ExecutorRunner: Launch command:
"/usr/lib/jvm/java-8-oracle/jre/bin/java" "-cp"
"/juser/press-mgmt/spark-1.5.0-bin-h\
adoop2.6/sbin/../conf/:/juser/press-mgmt/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar:/juser/press-mgmt/spark-1.5.0-b\
in-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/juser/press-mgmt/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/juser/press-mgm\
t/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar" "-Xms100M"
"-Xmx100M" "-Dspark.driver.port=36363" "org.apache.spark.executor.C\
oarseGrainedExecutorBackend" "--driver-url" "akka.tcp://
sparkDriver@172.31.61.43:36363/user/CoarseGrainedScheduler" "--executor-id"
"10" "-\
-hostname" "172.31.51.246" "--cores" "2" "--app-id"
"app-20150930233113-" "--worker-url" "akka.tcp://
sparkWorker@172.31.51.246:41893/us\
er/Worker"


nothing stands out to me.

also, one thing i learned from reading other posts is that we need the
fully qualified hostname, and not identify machines simply by their
hostname of IP address.

i have not been using fully qualified hostname. could that cause this
problem?

On Wed, Sep 30, 2015 at 11:23 PM, Shixiong Zhu  wrote:

> Do you have the log file? It may be because of wrong settings.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-10-01 7:32 GMT+08:00 markluk :
>
>> I setup a new Spark cluster. My worker node is dying with the following
>> exception.
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [120 seconds]
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at
>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>>
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
>> ... 11 more
>>
>>
>> Any ideas what's wrong? This is happening both for a spark program and
>> spark
>> shell.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Worker-node-timeout-exception-tp24893.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: What is the best way to submit multiple tasks?

2015-10-01 Thread Shixiong Zhu
Right, you can use SparkContext and SQLContext in multiple threads. They
are thread safe.

Best Regards,
Shixiong Zhu

2015-10-01 4:57 GMT+08:00 :

> Hi all,
>
> I have a process where I do some calculations on each one of the columns
> of a dataframe.
> Intrinsecally, I run across each column with a for loop. On the other
> hand, each process itself is non-entirely-distributable.
>
> To speed up the process, I would like to submit a spark program for each
> column, any suggestions? I was thinking on primitive threads sharing a
> spark context.
>
> Thank you,
> Saif
>
>


Re: Spark Streaming Standalone 1.5 - Stage cancelled because SparkContext was shut down

2015-10-01 Thread Shixiong Zhu
Do you have the log? Looks like some exceptions in your codes make
SparkContext stopped.

Best Regards,
Shixiong Zhu

2015-09-30 17:30 GMT+08:00 tranan :

> Hello All,
>
> I have several Spark Streaming applications running on Standalone mode in
> Spark 1.5.  Spark is currently set up for dynamic resource allocation.  The
> issue I am seeing is that I can have about 12 Spark Streaming Jobs running
> concurrently.  Occasionally I would see more than half where to fail due to
> Stage cancelled because SparkContext was shut down.  It would automatically
> restart as it runs on supervised mode.  Attached is the screenshot of one
> of
> the jobs that failed.  Anyone have any insight as to what is going on?
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24885/Screen_Shot_2015-09-29_at_8.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Standalone-1-5-Stage-cancelled-because-SparkContext-was-shut-down-tp24885.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [cache eviction] partition recomputation in big lineage RDDs

2015-10-01 Thread Hemant Bhanawat
As I understand, you don't need merge of  your historical data RDD with
your RDD_inc, what you need is merge of the computation results of the your
historical RDD with RDD_inc and so on.

IMO, you should consider having an external row store to hold your
computations. I say this because you need to update the rows of prior
computation based on the new data. Spark cached batches are column oriented
and any update to a spark cached batch is a costly op.


On Wed, Sep 30, 2015 at 10:59 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> An equivalent question would be: can the memory cache be selectively
> evicted from within a component run in the driver? I know it is breaking
> some abstraction/encapsulation, but clearly I need to evict part of the
> cache so that it is reloaded with newer values from DB.
>
>
> Because what I basically need is invalidating some portions of the data
> which have newer values. The "compute" method should be the same (read with
> TableInputFormat).
>
> Thanks
> Nicu
> --
> *From:* Nicolae Marasoiu 
> *Sent:* Wednesday, September 30, 2015 4:07 PM
> *To:* user@spark.apache.org
> *Subject:* Re: partition recomputation in big lineage RDDs
>
>
> Hi,
>
> In fact, my RDD will get a new version (a new RDD assigned to the same
> var) quite frequently, by merging bulks of 1000 events of events of last
> 10s.
>
> But recomputation would be more efficient to do not by reading initial RDD
> partition(s) and reapplying deltas, but by reading from HBase the latest
> data, and just compute on top of that if anything.
>
> Basically I guess I need to write my own RDD and implement compute method
> by sliding on hbase.
>
> Thanks,
> Nicu
> --
> *From:* Nicolae Marasoiu 
> *Sent:* Wednesday, September 30, 2015 3:05 PM
> *To:* user@spark.apache.org
> *Subject:* partition recomputation in big lineage RDDs
>
>
> Hi,
>
>
> If I implement a manner to have an up-to-date version of my RDD by
> ingesting some new events, called RDD_inc (from increment), and I provide a
> "merge" function m(RDD, RDD_inc), which returns the RDD_new, it looks like
> I can evolve the state of my RDD by constructing new RDDs all the time, and
> doing it in a manner that hopes to reuse as much data from the past RDD and
> make the rest garbage collectable. An example merge function would be a
> join on some ids, and creating a merged state for each element. The type of
> the result of m(RDD, RDD_inc) is the same type as that of RDD.
>
>
> My question on this is how does the recomputation work for such an RDD,
> which is not the direct result of hdfs load, but is the result of a long
> lineage of such functions/transformations:
>
>
> Lets say my RDD is now after 2 merge iterations like this:
>
> RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)
>
>
> When recomputing a part of RDD_new here are my assumptions:
>
> - only full partitions are recomputed, nothing more granular?
>
> - the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed
>
> - the function are applied
>
>
> And this seems more simplistic, since the partitions do not fully align in
> the general case between all these RDDs. The other aspect is the
> potentially redundant load of data which is in fact not required anymore
> (the data ruled out in the merge).
>
>
> A more detailed version of this question is at
> https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/
>
>
> Thanks,
>
> Nicu
>


Re: Worker node timeout exception

2015-10-01 Thread Shixiong Zhu
Do you have the log file? It may be because of wrong settings.

Best Regards,
Shixiong Zhu

2015-10-01 7:32 GMT+08:00 markluk :

> I setup a new Spark cluster. My worker node is dying with the following
> exception.
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [120 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
>
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
> ... 11 more
>
>
> Any ideas what's wrong? This is happening both for a spark program and
> spark
> shell.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Worker-node-timeout-exception-tp24893.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Problem understanding spark word count execution

2015-10-01 Thread java8964
I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers 
end, and merged back to the driver.
Yong

Date: Thu, 1 Oct 2015 13:33:59 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: nicolae.maras...@adswizz.com
CC: user@spark.apache.org

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such 
that EACH executer works on only the local copy of file part available , in 
this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 partitions 
will be created and each task will run on  1 partition (which is what I am 
seeing in the stages logs) , also i assume it will read the file in a way that 
each executer will have exactly same amount of data. so there shouldn't be any 
shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the 
output I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC 
TimeInput Size / RecordsWrite TimeShuffle Write Size / 
RecordsErrors0440SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 
s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 
10.35.244.112015/09/29 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 
273I have following questions -
1) What exactly is 2.7KB of shuffle write  ?2) is this 2.7 KB of shuffle write 
is local to that executer ?3) In the executers log I am seeing 2000 bytes 
results sent to the driver , if instead this number is much much greater than 
2000 byes such that it does not fit in executer's memory , will shuffle write 
increase ?4)For word count 256 MB data is substantial amount text , how come 
the result for this stage is only 2000 bytes !! it should send everyword with 
respective count , for a 256 MB input this result should be much bigger ? 
I hope I am clear this time.
Hope to get a reply,
ThanksKartik


On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu 
 wrote:







Hi,




So you say " sc.textFile
-> flatMap -> Map".



My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).



Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the
partitions: for instance in HDFS they would do an open and a seek I guess and 
just read from the stream there, convert to whatever the InputFormat dictates.


The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
remote
nodes which do have replicas of that data.



After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).



Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.



Nicu



From: Kartik Mathur 

Sent: Thursday, October 1, 2015 10:25 PM

To: Nicolae Marasoiu

Cc: user

Subject: Re: Problem understanding spark word count execution
 


Thanks Nicolae , 
So In my case all executers are sending results back to the driver 

Re: Spark streaming job filling a lot of data in local spark nodes

2015-10-01 Thread swetha kasireddy
We have limited disk space. So, can we have spark.cleaner.ttl to clean up
the files? Or is there any setting that can cleanup old temp files?

On Mon, Sep 28, 2015 at 7:02 PM, Shixiong Zhu  wrote:

> These files are created by shuffle and just some temp files. They are not
> necessary for checkpointing and only stored in your local temp directory.
> They will be stored in "/tmp" by default. You can use `spark.local.dir` to
> set the path if you find your "/tmp" doesn't have enough space.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-29 1:04 GMT+08:00 swetha :
>
>>
>> Hi,
>>
>> I see a lot of data getting filled locally as shown below from my
>> streaming
>> job. I have my checkpoint set to hdfs. But, I still see the following data
>> filling my local nodes. Any idea if I can make this stored in hdfs instead
>> of storing the data locally?
>>
>> -rw-r--r--  1520 Sep 17 18:43 shuffle_23119_5_0.index
>> -rw-r--r--  1 180564255 Sep 17 18:43 shuffle_23129_2_0.data
>> -rw-r--r--  1 364850277 Sep 17 18:45 shuffle_23145_8_0.data
>> -rw-r--r--  1  267583750 Sep 17 18:46 shuffle_23105_4_0.data
>> -rw-r--r--  1  136178819 Sep 17 18:48 shuffle_23123_8_0.data
>> -rw-r--r--  1  159931184 Sep 17 18:48 shuffle_23167_8_0.data
>> -rw-r--r--  1520 Sep 17 18:49 shuffle_23315_7_0.index
>> -rw-r--r--  1520 Sep 17 18:50 shuffle_23319_3_0.index
>> -rw-r--r--  1   92240350 Sep 17 18:51 shuffle_23305_2_0.data
>> -rw-r--r--  1   40380158 Sep 17 18:51 shuffle_23323_6_0.data
>> -rw-r--r--  1  369653284 Sep 17 18:52 shuffle_23103_6_0.data
>> -rw-r--r--  1  371932812 Sep 17 18:52 shuffle_23125_6_0.data
>> -rw-r--r--  1   19857974 Sep 17 18:53 shuffle_23291_19_0.data
>> -rw-r--r--  1  55342005 Sep 17 18:53 shuffle_23305_8_0.data
>> -rw-r--r--  1   92920590 Sep 17 18:53 shuffle_23303_4_0.data
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-filling-a-lot-of-data-in-local-spark-nodes-tp24846.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


[ANNOUNCE] Announcing Spark 1.5.1

2015-10-01 Thread Reynold Xin
Hi All,

Spark 1.5.1 is a maintenance release containing stability fixes. This
release is based on the branch-1.5 maintenance branch of Spark. We
*strongly recommend* all 1.5.0 users to upgrade to this release.

The full list of bug fixes is here: http://s.apache.org/spark-1.5.1

http://spark.apache.org/releases/spark-release-1-5-1.html


(note: it can take a few hours for everything to be propagated, so you
might get 404 on some download links, but everything should be in maven
central already)


Re: How to access lost executor log file

2015-10-01 Thread Ted Yu
Looks like the spark history server should take the lost exectuors into
account by analyzing the output from 'yarn logs applicationId' command.

Cheers

On Thu, Oct 1, 2015 at 11:46 AM, Lan Jiang  wrote:

> Ted,
>
> Thanks for your reply.
>
> First of all, after sending email to the mailing list,  I use yarn logs
> applicationId  to retrieve the aggregated log
> successfully.  I found the exceptions I am looking for.
>
> Now as to your suggestion, when I go to the YARN RM UI, I can only see the
> "Tracking URL" in the application overview section. When I click it, it
> brings me to the spark history server UI, where I cannot find the lost
> exectuors. The only logs link I can find one the YARN RM site is the
> ApplicationMaster log, which is not what I need. Did I miss something?
>
> Lan
>
> On Thu, Oct 1, 2015 at 1:30 PM, Ted Yu  wrote:
>
>> Can you go to YARN RM UI to find all the attempts for this Spark Job ?
>>
>> The two lost executors should be found there.
>>
>> On Thu, Oct 1, 2015 at 10:30 AM, Lan Jiang  wrote:
>>
>>> Hi, there
>>>
>>> When running a Spark job on YARN, 2 executors somehow got lost during
>>> the execution. The message on the history server GUI is “CANNOT find
>>> address”.  Two extra executors were launched by YARN and eventually
>>> finished the job. Usually I go to the “Executors” tab on the UI to check
>>> the executor stdout/stderr for troubleshoot. Now if I go to the “Executors”
>>> tab,  I do not see the 2 executors that were lost. I can only see the rest
>>> executors and the 2 new executors. Thus I cannot check the stdout/stderr of
>>> the lost executors. How can I access the log files of these lost executors
>>> to find out why they were lost?
>>>
>>> Thanks
>>>
>>> Lan
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


spark-submit --packages using different resolver

2015-10-01 Thread Jerry Lam
Hi spark users and developers,

I'm trying to use spark-submit --packages against private s3 repository.
With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
wonder how can I add this resolver into spark-submit such that --packages
can resolve dependencies from private repo?

Thank you!

Jerry


calling persist would cause java.util.NoSuchElementException: key not found:

2015-10-01 Thread Eyad Sibai
Hi

I am trying to call .persist() on a dataframe but once I execute the next line 
I am getting
java.util.NoSuchElementException: key not found: ….


I tried to do persist on disk also the same thing.


I am using:
pyspark with python3
spark 1.5




Thanks!



EYAD SIBAI
Risk Engineer

iZettle ®
––


Mobile: +46 72 911 60 54
Web: www.izettle.com