FW: Spark streaming - failed recovery from checkpoint

2015-11-02 Thread Adrian Tanase
Re-posting here, didn’t get any feedback on the dev list.

Has anyone experienced corrupted checkpoints recently?

Thanks!
-adrian

From: Adrian Tanase
Date: Thursday, October 29, 2015 at 1:38 PM
To: "d...@spark.apache.org"
Subject: Spark streaming - failed recovery from checkpoint

Hi guys,

I’ve encountered some problems with a crashed Spark Streaming job, when 
restoring from checkpoint.
I’m runnning spark 1.5.1 on Yarn (hadoop 2.6) in cluster mode, reading from 
Kafka with the direct consumer and a few updateStateByKey stateful 
transformations.

After investigating, I think the following happened:

  *   Active ResourceManager crashed (aws machine crashed)
  *   10 minutes later — default Yarn settings :( — Standby took over and 
redeployed the job, sending a SIGTERM to the running driver
  *   Recovery from checkpoint failed because of missing RDD in checkpoint 
folder

One complication - UNCONFIRMED because of missing logs – I believe that the new 
driver was started ~5 minutes before the old one stopped.

With your help, I’m trying to zero in on a root cause or a combination of:

  *   bad Yarn/Spark configuration (10 minutes to react to missing node, 
already fixed through more aggressive liveliness settings)
  *   YARN fact of life – why is running job redeployed when standby RM takes 
over?
  *   Bug/race condition in spark checkpoint cleanup/recovery? (why is RDD 
cleaned up by the old app and then recovery fails when it looks for it?)
  *   Bugs in the Yarn-Spark integration (missing heartbeats? Why is the new 
app started 5 minutes before the old one dies?)
  *   Application code – should we add graceful shutdown? Should I add a 
Zookeeper lock that prevents 2 instances of the driver starting at the same 
time?

Sorry if the questions are a little all over the place, getting to the root 
cause of this was a pain and I can’t even log an issue in Jira without your 
help.

Attaching some logs that showcase the checkpoint recovery failure (I’ve grepped 
for “checkpoint” to highlight the core issue):

  *   Driver logs prior to shutdown: http://pastebin.com/eKqw27nT
  *   Driver logs, failed recovery: http://pastebin.com/pqACKK7W
  *
Other info:
 *   spark.streaming.unpersist = true
 *   spark.cleaner.ttl = 259200 (3 days)

Last question – in the checkpoint recovery process I notice that it’s going 
back ~6 minutes on the persisted RDDs and ~10 minutes to replay from kafka.
I’m running with 20 second batches and 100 seconds checkpoint interval (small 
issue - one of the RDDs was using the default interval of 20 secs). Shouldn’t 
the lineage be a lot smaller?
Based on the documentation I would have expected that the recovery goes back at 
most 100 seconds, as I’m not doing any windowed operations…

Thanks in advance!
-adrian


Why does sortByKey() transformation trigger a job in spark-shell?

2015-11-02 Thread Jacek Laskowski
Hi Sparkians,

I use the latest Spark 1.6.0-SNAPSHOT in spark-shell with the default
local[*] master.

I created an RDD of pairs using the following snippet:

val rdd = sc.parallelize(0 to 5).map(n => (n, util.Random.nextBoolean))

It's all fine so far. The map transformation causes no computation.

I thought all transformations are lazy and trigger no job until an
action's called. It seems I was wrong with sortByKey()! When I called
`rdd.sortByKey()`, it started a job: sortByKey at :27 (!)

Can anyone explain what makes for the different behaviour of sortByKey
since it is a transformation and hence should be lazy? Is this a
special transformation?

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why does sortByKey() transformation trigger a job in spark-shell?

2015-11-02 Thread Jacek Laskowski
Hi,

Answering my own question after...searching sortByKey in the mailing
list archives and later in JIRA.

It turns out it's a known issue and filed under
https://issues.apache.org/jira/browse/SPARK-1021 "sortByKey() launches
a cluster job when it shouldn't".

It's labelled "starter" that should not be that hard to fix. Does this
still hold? I'd like to work on it if it's "simple" and doesn't get me
swamped. Thanks!

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Mon, Nov 2, 2015 at 2:34 PM, Jacek Laskowski  wrote:
> Hi Sparkians,
>
> I use the latest Spark 1.6.0-SNAPSHOT in spark-shell with the default
> local[*] master.
>
> I created an RDD of pairs using the following snippet:
>
> val rdd = sc.parallelize(0 to 5).map(n => (n, util.Random.nextBoolean))
>
> It's all fine so far. The map transformation causes no computation.
>
> I thought all transformations are lazy and trigger no job until an
> action's called. It seems I was wrong with sortByKey()! When I called
> `rdd.sortByKey()`, it started a job: sortByKey at :27 (!)
>
> Can anyone explain what makes for the different behaviour of sortByKey
> since it is a transformation and hence should be lazy? Is this a
> special transformation?
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: execute native system commands in Spark

2015-11-02 Thread Adrian Tanase
Have you seen .pipe()?




On 11/2/15, 5:36 PM, "patcharee"  wrote:

>Hi,
>
>Is it possible to execute native system commands (in parallel) Spark, 
>like scala.sys.process ?
>
>Best,
>Patcharee
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming and periodic broadcast

2015-11-02 Thread Serafín Sedano Arenas
Hi all,

I have a long lived Spark Streaming cluster feeding from several Kinesis
streams. There is some data that must be obtained from another data source
to be used in one of the steps on the workers.

My understanding is that the best way to achieve this is by broadcasting
that data. Is that correct? How can I periodically broadcast this info? Is
it a good idea to use StreamingListener.onBatchStarted to achieve this?

Best regards,
Serafin.


Re: Sort Merge Join

2015-11-02 Thread Alex Nastetsky
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be
a simple and SAFE way for JOIN, as it’s hard to guarantee the records from
different data sets with the identical join keys will be loaded by the same
node/task , since lots of factors need to be considered, like task pool
size, cluster size, source format, storage, data locality etc.,.

I’ll agree it’s worth to optimize it for performance concerns, and actually
in Hive, it is called bucket join. I am not sure will that happens soon in
Spark SQL.


Yes, this is supported in

   - Hive with bucket join
   - Pig with USING "merge"
   
   - MR with CompositeInputFormat

But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao  wrote:

> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
> example, in the code below, the two datasets have different number of
> partitions, but it still does a SortMerge join after a "hashpartitioning".
>
>
>
> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
> requires the records with the identical join keys MUST BE shuffled to the
> same “reducer” node / task, hashpartitioning is just a strategy to tell
> spark shuffle service how to achieve that, in theory, we even can use the
> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
> about the shuffle strategy so much if it satisfies the demand on data
> distribution.
>
>
>
> 2) If both datasets have already been previously partitioned/sorted the
> same and stored on the file system (e.g. in a previous job), is there a way
> to tell Spark this so that it won't want to do a "hashpartitioning" on
> them? It looks like Spark just considers datasets that have been just read
> from the the file system to have UnknownPartitioning. In the example below,
> I try to join a dataframe to itself, and it still wants to hash repartition.
>
>
>
> [Hao:] Take this as example:
>
>
>
> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
> a.key=b.key JOIN src c ON b.key=c.key
>
>
>
> == Physical Plan ==
>
> TungstenProject [value#20,value#22,value#24]
>
> SortMergeJoin [key#21], [key#23]
>
>   TungstenSort [key#21 ASC], false, 0
>
>TungstenProject [key#21,value#22,value#20]
>
> SortMergeJoin [key#19], [key#21]
>
>  TungstenSort [key#19 ASC], false, 0
>
>   TungstenExchange hashpartitioning(key#19,200)
>
>ConvertToUnsafe
>
> HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
> Some(a))
>
>  TungstenSort [key#21 ASC], false, 0
>
>   TungstenExchange hashpartitioning(key#21,200)
>
>ConvertToUnsafe
>
> HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
> Some(b))
>
>   TungstenSort [key#23 ASC], false, 0
>
>TungstenExchange hashpartitioning(key#23,200)
>
> ConvertToUnsafe
>
>  HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
> Some(c))
>
>
>
> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
> src b ON a.key=b.key”, as we didn’t change the data distribution after
> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
> which only require the table “c” for repartitioning on “key”.
>
>
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
>
> Hao
>
>
>
> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
> *Sent:* Monday, November 2, 2015 11:29 AM
> *To:* user
> *Subject:* Sort Merge Join
>
>
>
> Hi,
>
>
>
> I'm trying to understand SortMergeJoin (SPARK-2213).
>
>
>
> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
> example, in the code below, the two datasets have different number of
> partitions, but it still does a SortMerge join after a "hashpartitioning".
>
>
>
> CODE:
>
>val sparkConf = new SparkConf()
>
>   .setAppName("SortMergeJoinTest")
>
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>   .set("spark.eventLog.enabled", "true")
>
>   .set("spark.sql.planner.sortMergeJoin","true")
>
>
>
> sparkConf.setMaster("local-cluster[3,1,1024]")
>
>
>
> val sc = new SparkContext(sparkConf)
>
> val sqlContext = new SQLContext(sc)
>
> import sqlContext.implicits._
>
>
>
> val inputpath = input.gz.parquet
>
>
>
> val df1 = 

Re: Best practises

2015-11-02 Thread Denny Lee
In addition, you may want to check out Tuning and Debugging in Apache Spark
(https://sparkhub.databricks.com/video/tuning-and-debugging-apache-spark/)

On Mon, Nov 2, 2015 at 05:27 Stefano Baghino 
wrote:

> There is this interesting book from Databricks:
> https://www.gitbook.com/book/databricks/databricks-spark-knowledge-base/details
>
> What do you think? Does it contain the info you're looking for? :)
>
> On Mon, Nov 2, 2015 at 2:18 PM, satish chandra j  > wrote:
>
>> HI All,
>> Yes, any such doc will be a great help!!!
>>
>>
>>
>> On Fri, Oct 30, 2015 at 4:35 PM, huangzheng <1106944...@qq.com> wrote:
>>
>>> I have the same question.anyone help us.
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Deepak Sharma";
>>> *发送时间:* 2015年10月30日(星期五) 晚上7:23
>>> *收件人:* "user";
>>> *主题:* Best practises
>>>
>>> Hi
>>> I am looking for any blog / doc on the developer's best practices if
>>> using Spark .I have already looked at the tuning guide on
>>> spark.apache.org.
>>> Please do let me know if any one is aware of any such resource.
>>>
>>> Thanks
>>> Deepak
>>>
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: Best practises

2015-11-02 Thread satish chandra j
HI All,
Yes, any such doc will be a great help!!!



On Fri, Oct 30, 2015 at 4:35 PM, huangzheng <1106944...@qq.com> wrote:

> I have the same question.anyone help us.
>
>
> -- 原始邮件 --
> *发件人:* "Deepak Sharma";
> *发送时间:* 2015年10月30日(星期五) 晚上7:23
> *收件人:* "user";
> *主题:* Best practises
>
> Hi
> I am looking for any blog / doc on the developer's best practices if using
> Spark .I have already looked at the tuning guide on spark.apache.org.
> Please do let me know if any one is aware of any such resource.
>
> Thanks
> Deepak
>


Re: Spark, Mesos problems with remote connections

2015-11-02 Thread Tamas Szuromi
Hello Sebastian,

Did you set the MESOS_NATIVE_JAVA_LIBRARY variable before you started
pyspark?

cheers,

Tamas




On 2 November 2015 at 15:24, Sebastian Kuepers <
sebastian.kuep...@publicispixelpark.de> wrote:

> Hey,
>
>
> I have a Mesos cluster with a single Master. If I run the following
> directly on the master machine:
>
>
> pyspark --master mesos://host:5050
>
>
> everything works just fine. If I try to connect from to the master
> starting a driver from my laptop everything stops after the following
> log output from the Spark driver:
>
>
> I1102 15:10:57.848831 64856064 sched.cpp:164] Version: 0.24.0
> I1102 15:10:57.852708 19017728 sched.cpp:262] New master detected at
> master@217.66.55.19:5050
> I1102 15:10:57.852934 19017728 sched.cpp:272] No credentials provided.
> Attempting to register without authentication
>
>
> The Mesos master logs show the following:
>
>
> I1102 15:09:50.676004 21563 master.cpp:2250] Subscribing framework Talos
> with checkpointing disabled and capabilities [  ]
> I1102 15:09:50.676686 21566 hierarchical.hpp:515] Added framework
> b995b024-6ec6-4bd9-b251-e4e6b1828eda-0004
> I1102 15:09:51.671437 21562 http.cpp:336] HTTP GET for /master/state.json
> from 217.66.51.150:51588 with User-Agent='Mozilla/5.0 (X11; Linux x86_64)
> AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.80 Safari/537.36'
> I1102 15:09:52.357230 21567 master.cpp:2179] Received SUBSCRIBE call for
> framework 'Talos' at
> scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
> I1102 15:09:52.357416 21567 master.cpp:2250] Subscribing framework Talos
> with checkpointing disabled and capabilities [  ]
> I1102 15:09:52.357520 21567 master.cpp:2260] Framework
> b995b024-6ec6-4bd9-b251-e4e6b1828eda-0004 (Talos) at
> scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
> already subscribed, resending acknowledgement
> I1102 15:09:53.938227 21563 master.cpp:2179] Received SUBSCRIBE call for
> framework 'Talos' at
> scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
> I1102 15:09:53.938426 21563 master.cpp:2250] Subscribing framework Talos
> with checkpointing disabled and capabilities [  ]
> I1102 15:09:53.938534 21563 master.cpp:2260] Framework
> b995b024-6ec6-4bd9-b251-e4e6b1828eda-0004 (Talos) at
> scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
> already subscribed, resending acknowledgement
> I1102 15:10:00.207372 21567 master.cpp:2179] Received SUBSCRIBE call for
> framework 'Talos' at
> scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
>
>
> The framework also shows up as active in the UI and blocks all resources
> of the cluster, while the SUBSCRIBE calls keep coming in.
>
> Mesos authentication is completely disabled.
>
>
> What could be possible causes for this problem?
>
>
> Thanks,
>
> Sebastian
>
>
>
>
> 
> Disclaimer The information in this email and any attachments may contain
> proprietary and confidential information that is intended for the
> addressee(s) only. If you are not the intended recipient, you are hereby
> notified that any disclosure, copying, distribution, retention or use of
> the contents of this information is prohibited. When addressed to our
> clients or vendors, any information contained in this e-mail or any
> attachments is subject to the terms and conditions in any governing
> contract. If you have received this e-mail in error, please immediately
> contact the sender and delete the e-mail.
>


execute native system commands in Spark

2015-11-02 Thread patcharee

Hi,

Is it possible to execute native system commands (in parallel) Spark, 
like scala.sys.process ?


Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Required file not found: sbt-interface.jar

2015-11-02 Thread Ted Yu
sbt-interface.jar is under build/zinc-0.3.5.3/lib/sbt-interface.jar

You can run build/mvn first to download it.

Cheers

On Mon, Nov 2, 2015 at 1:51 AM, Todd  wrote:

> Hi,
> I am trying to build spark 1.5.1 in my environment, but encounter the
> following error complaining Required file not found: sbt-interface.jar:
> The error message is below and I am building with:
>
> ./make-distribution.sh --name spark-1.5.1-bin-2.6.0 --tgz --with-tachyon
> -Phadoop-2.6 -Dhadoop.version=2.6.0 -Pyarn -Phive -Dhive-version=1.2.1
> -Phive-thriftserver -Pspark-ganglia-lgpl -Dscala-2.10.4
>
>
>
> [INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @
> spark-launcher_2.10 ---
> [INFO] Using zinc server for incremental compilation
> [error] Required file not found: sbt-interface.jar
> [error] See zinc -help for information about locating necessary files
> [INFO]
> 
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Spark Project Parent POM ... SUCCESS [
> 3.192 s]
> [INFO] Spark Project Launcher . FAILURE [
> 2.950 s]
>
>


Re: Exception while reading from kafka stream

2015-11-02 Thread Cody Koeninger
combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
Then you can use hasOffsetRanges to see what the topic for a given
partition is.

On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V  wrote:

> if i try like below code snippet , it shows exception , how to avoid this
> exception ? how to switch processing based on topic ?
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(30));
> HashSet topicsSet_1 = new
> HashSet(Arrays.asList(topics.split(",")));
> HashSet topicsSet_2 = new
> HashSet(Arrays.asList(topics.split(",")));
> HashMap kafkaParams = new HashMap();
> kafkaParams.put("metadata.broker.list", brokers);
> JavaPairInputDStream messages_1 =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
>topicsSet_1
>);
>
> JavaPairInputDStream messages_2 =
> KafkaUtils.createDirectStream(
>jssc,
>String.class,
>String.class,
>StringDecoder.class,
>StringDecoder.class,
>kafkaParams,
> topicsSet_2
>);
>
> * Log Trace* :
>
> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
> exception: java.io.IOException: Failed to delete
> somedomain/user/hdfs/spark_output/kyt_req/part-00055
> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception: java.io.IOException:
> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain":8020;
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain;
> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.NullPointerException)
> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
> ApplicationMaster with FAILED (diag message: User class threw exception:
> java.lang.NullPointerException)
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain":8020;
> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
>
>
>
> *Thanks*,
> 
>
>
> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger 
> wrote:
>
>> Just put them all in one stream and switch processing based on the topic
>>
>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V 
>> wrote:
>>
>>> i want to join all those logs in some manner. That's what i'm trying to
>>> do.
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao 
>>> wrote:
>>>
 I don't think Spark Streaming supports multiple streaming context in
 one jvm, you cannot use in such way. Instead you could run multiple
 streaming applications, since you're using Yarn.

 2015年10月30日星期五,Ramkumar V  写道:

> I found NPE is mainly because of im using the

Re: Best practises

2015-11-02 Thread Sushrut Ikhar
This presentation may clarify many of your doubts.
https://www.youtube.com/watch?v=7ooZ4S7Ay6Y

Regards,

Sushrut Ikhar
[image: https://]about.me/sushrutikhar



On Mon, Nov 2, 2015 at 7:15 PM, Denny Lee  wrote:

> In addition, you may want to check out Tuning and Debugging in Apache
> Spark (
> https://sparkhub.databricks.com/video/tuning-and-debugging-apache-spark/)
>
> On Mon, Nov 2, 2015 at 05:27 Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> There is this interesting book from Databricks:
>> https://www.gitbook.com/book/databricks/databricks-spark-knowledge-base/details
>>
>> What do you think? Does it contain the info you're looking for? :)
>>
>> On Mon, Nov 2, 2015 at 2:18 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>> Yes, any such doc will be a great help!!!
>>>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 4:35 PM, huangzheng <1106944...@qq.com> wrote:
>>>
 I have the same question.anyone help us.


 -- 原始邮件 --
 *发件人:* "Deepak Sharma";
 *发送时间:* 2015年10月30日(星期五) 晚上7:23
 *收件人:* "user";
 *主题:* Best practises

 Hi
 I am looking for any blog / doc on the developer's best practices if
 using Spark .I have already looked at the tuning guide on
 spark.apache.org.
 Please do let me know if any one is aware of any such resource.

 Thanks
 Deepak

>>>
>>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Adrian Tanase
You are correct, the default checkpointing interval is 10 seconds or your batch 
size, whichever is bigger. You can change it by calling .checkpoint(x) on your 
resulting Dstream.

For the rest, you are probably keeping an “all time” word count that grows 
unbounded if you never remove words from the map. Keep in mind that 
updateStateByKey is called for every key in the state RDD, regardless if you 
have new occurrences or not.

You should consider at least one of these strategies:

  *   run your word count on a windowed Dstream (e.g. Unique counts over the 
last 15 minutes)
 *   Your best bet here is reduceByKeyAndWindow with an inverse function
  *   Make your state object more complicated and try to prune out words with 
very few occurrences or that haven’t been updated for a long time
 *   You can do this by emitting None from updateStateByKey

Hope this helps,
-adrian

From: Thúy Hằng Lê
Date: Monday, November 2, 2015 at 7:20 AM
To: "user@spark.apache.org"
Subject: Spark Streaming data checkpoint performance

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));


Re: apply simplex method to fix linear programming in spark

2015-11-02 Thread Sean Owen
I might be steering this a bit off topic: does this need the simplex
method? this is just an instance of nonnegative least squares. I don't
think it relates to LDA either.

Spark doesn't have any particular support for NNLS (right?) or simplex though.

On Mon, Nov 2, 2015 at 6:03 PM, Debasish Das  wrote:
> Use breeze simplex which inturn uses apache maths simplex...if you want to
> use interior point method you can use ecos
> https://github.com/embotech/ecos-java-scala ...spark summit 2014 talk on
> quadratic solver in matrix factorization will show you example integration
> with spark. ecos runs as jni process in every executor.
>
> On Nov 1, 2015 9:52 AM, "Zhiliang Zhu"  wrote:
>>
>> Hi Ted Yu,
>>
>> Thanks very much for your kind reply.
>> Do you just mean that in spark there is no specific package for simplex
>> method?
>>
>> Then I may try to fix it by myself, do not decide whether it is convenient
>> to finish by spark, before finally fix it.
>>
>> Thank you,
>> Zhiliang
>>
>>
>>
>>
>> On Monday, November 2, 2015 1:43 AM, Ted Yu  wrote:
>>
>>
>> A brief search in code base shows the following:
>>
>> TODO: Add simplex constraints to allow alpha in (0,1).
>> ./mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
>>
>> I guess the answer to your question is no.
>>
>> FYI
>>
>> On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu 
>> wrote:
>>
>> Dear All,
>>
>> As I am facing some typical linear programming issue, and I know simplex
>> method is specific in solving LP question,
>> I am very sorry that whether there is already some mature package in spark
>> about simplex method...
>>
>> Thank you very much~
>> Best Wishes!
>> Zhiliang
>>
>>
>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: apply simplex method to fix linear programming in spark

2015-11-02 Thread Debasish Das
Use breeze simplex which inturn uses apache maths simplex...if you want to
use interior point method you can use ecos
https://github.com/embotech/ecos-java-scala ...spark summit 2014 talk on
quadratic solver in matrix factorization will show you example integration
with spark. ecos runs as jni process in every executor.
On Nov 1, 2015 9:52 AM, "Zhiliang Zhu"  wrote:

> Hi Ted Yu,
>
> Thanks very much for your kind reply.
> Do you just mean that in spark there is no specific package for simplex
> method?
>
> Then I may try to fix it by myself, do not decide whether it is convenient
> to finish by spark, before finally fix it.
>
> Thank you,
> Zhiliang
>
>
>
>
> On Monday, November 2, 2015 1:43 AM, Ted Yu  wrote:
>
>
> A brief search in code base shows the following:
>
> TODO: Add simplex constraints to allow alpha in (0,1).
> ./mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
>
> I guess the answer to your question is no.
>
> FYI
>
> On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu 
> wrote:
>
> Dear All,
>
> As I am facing some typical linear programming issue, and I know simplex
> method is specific in solving LP question,
> I am very sorry that whether there is already some mature package in spark
> about simplex method...
>
> Thank you very much~
> Best Wishes!
> Zhiliang
>
>
>
>
>
>


Re: [Yarn] How to set user in ContainerLaunchContext?

2015-11-02 Thread Marcelo Vanzin
You can try the "--proxy-user" command line argument for spark-submit.
That requires that your RM configuration allows the user running your
AM to "proxy" other users. And I'm not completely sure it works
without Kerberos.

See: 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html

On Mon, Nov 2, 2015 at 8:02 AM, Peter Rudenko  wrote:
> Hi, i have an ApplicationMaster which accepts requests and launches
> container on which it launches spark-submit --master yarn. In request i have
> a field "username" - the user i want to laucnh a job from. How can i set a
> user which will be run conmmand on a container? Currently they all running
> as yarn user even though AM is running as a root user.
>
> Here's my code:
>
>
> private def setupTokens(username: String): ByteBuffer = {
>   val credentials = UserGroupInformation.createProxyUser(username,
> UserGroupInformation.getCurrentUser).getCredentials
>   val dob = new DataOutputBuffer();
>   credentials.writeTokenStorageToStream(dob);
>   ByteBuffer.wrap(dob.getData(), 0, dob.getLength()).duplicate();
> }
>
> val cCLC = Records.newRecord(classOf[ContainerLaunchContext])
>
> cCLC.setCommands(List("spark-submit --master yarn ..."))
>
> cCLC.setTokens(setupTokens(user))
>
> Thanks, Peter Rudenko



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: callUdf("percentile_approx",col("mycol"),lit(0.25)) does not compile spark 1.5.1 source but it does work in spark 1.5.1 bin

2015-11-02 Thread Umesh Kacha
Hi Ted I checked  hive-exec-1.2.1.spark.jar contains the following required
classes but still it doesn't compile I don't understand why is this Jar
getting overwritten in scope

org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class

Please guide.

On Mon, Oct 19, 2015 at 4:30 PM, Umesh Kacha  wrote:

> Hi Ted thanks much for your help really appreciate it. I tried to use
> maven dependencies you mentioned but still callUdf is not compiling please
> find snap shot of my intellij editor. I am sorry you may have to zoom
> pictures as I can't share code. Thanks again.
> On Oct 19, 2015 8:32 AM, "Ted Yu"  wrote:
>
>> Umesh:
>>
>> $ jar tvf
>> /home/hbase/.m2/repository/org/spark-project/hive/hive-exec/1.2.1.spark/hive-exec-1.2.1.spark.jar
>> | grep GenericUDAFPercentile
>>   2143 Fri Jul 31 23:51:48 PDT 2015
>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$1.class
>>   4602 Fri Jul 31 23:51:48 PDT 2015
>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class
>>
>> As long as the following dependency is in your pom.xml:
>> [INFO] +- org.spark-project.hive:hive-exec:jar:1.2.1.spark:compile
>>
>> You should be able to invoke percentile_approx
>>
>> Cheers
>>
>> On Sun, Oct 18, 2015 at 8:58 AM, Umesh Kacha 
>> wrote:
>>
>>> Thanks much Ted so when do we get to use this sparkUdf in Java code
>>> using maven code dependencies?? You said JIRA 10671 is not pushed as
>>> part of 1.5.1 so it should be released in 1.6.0 as mentioned in the JIRA
>>> right?
>>>
>>> On Sun, Oct 18, 2015 at 9:20 PM, Ted Yu  wrote:
>>>
 The udf is defined in GenericUDAFPercentileApprox of hive.

 When spark-shell runs, it has access to the above class which is
 packaged
 in assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.7.0.jar
 :

   2143 Fri Oct 16 15:02:26 PDT 2015
 org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$1.class
   4602 Fri Oct 16 15:02:26 PDT 2015
 org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class
   1697 Fri Oct 16 15:02:26 PDT 2015
 org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFPercentileApproxEvaluator$PercentileAggBuf.class
   6570 Fri Oct 16 15:02:26 PDT 2015
 org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFPercentileApproxEvaluator.class
   4334 Fri Oct 16 15:02:26 PDT 2015
 org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFSinglePercentileApproxEvaluator.class
   6293 Fri Oct 16 15:02:26 PDT 2015
 org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.class

 That was the cause for different behavior.

 FYI

 On Sun, Oct 18, 2015 at 12:10 AM, unk1102 
 wrote:

> Hi starting new thread following old thread looks like code for
> compiling
> callUdf("percentile_approx",col("mycol"),lit(0.25)) is not merged in
> spark
> 1.5.1 source but I dont understand why this function call works in
> Spark
> 1.5.1 spark-shell/bin. Please guide.
>
> -- Forwarded message --
> From: "Ted Yu" 
> Date: Oct 14, 2015 3:26 AM
> Subject: Re: How to calculate percentile of a column of DataFrame?
> To: "Umesh Kacha" 
> Cc: "Michael Armbrust" ,
> "saif.a.ell...@wellsfargo.com" ,
> "user" 
>
> I modified DataFrameSuite, in master branch, to call percentile_approx
> instead of simpleUDF :
>
> - deprecated callUdf in SQLContext
> - callUDF in SQLContext *** FAILED ***
>   org.apache.spark.sql.AnalysisException: undefined function
> percentile_approx;
>   at
>
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>   at
>
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>   at scala.Option.getOrElse(Option.scala:120)
>   at
>
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:63)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>   at
>
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
>   at
>
> 

Re: ipython notebook NameError: name 'sc' is not defined

2015-11-02 Thread Jörn Franke
You can check a script that I created for the Amazon cloud:
https://snippetessay.wordpress.com/2015/04/18/big-data-lab-in-the-cloud-with-hadoopsparkrpython/

If I remember correctly then you need to add something to the startup py for 
ipython
> On 03 Nov 2015, at 01:04, Andy Davidson  wrote:
> 
> Hi
> 
> I recently installed a new cluster using the 
> spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2. SparkPi sample app works correctly. 
> 
> I am trying to run iPython notebook on my cluster master and use an ssh 
> tunnel so that I can work with the notebook in a browser running on my mac. 
> Bellow is how I set up the ssh tunnel
> 
>   $ ssh -i $KEY_FILE -N -f -L localhost::localhost:7000 
> ec2-user@$SPARK_MASTER
> 
>   $ ssh -i $KEY_FILE ec2-user@$SPARK_MASTER
>   $ cd top level notebook dir
>   $ IPYTHON_OPTS="notebook --no-browser --port=7000" 
> /root/spark/bin/pyspark
> 
> I am able to access my notebooks in the browser by opening 
> http://localhost:
> 
> When I run the following python code I get an error NameError: name 'sc' is 
> not defined? Any idea what the problem might be? 
> 
> I looked through pyspark and tried various combinations of the following but 
> still get the same error
> 
> $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook 
> --no-browser --port=7000" /root/spark/bin/pyspark --master=local[2]
> 
> Kind regards
> 
> Andy
> 
> 
> 
> 
> 
> In [1]:
> 
> import sys
> print (sys.version)
>  
> import os
> print(os.getcwd() + "\n")
> 2.6.9 (unknown, Apr  1 2015, 18:16:00) 
> [GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]
> /home/ec2-user/dataScience
> 
> In [2]:
> 
> from pyspark import SparkContext
> textFile = sc.textFile("readme.txt")
> textFile.take(1)
> ---
> NameError Traceback (most recent call last)
>  in ()
>   1 from pyspark import SparkContext
> > 2 textFile = sc.textFile("readme.txt")
>   3 textFile.take(1)
> 
> NameError: name 'sc' is not defined
> 
> In [ ]:
> 
>  


Re: How do I get the executor ID from running Java code

2015-11-02 Thread Gideon
Looking at the post date I can only assume you've got your answer.
since I just encountered your post while trying to do the same thing I
decided it's worth answering for other people. In order to get the executor
ID you can use: SparkEnv.get().executorId() 
I hope this helps anyone 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-get-the-executor-ID-from-running-Java-code-tp19092p25258.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort Merge Join

2015-11-02 Thread Jonathan Coveney
Additionally, I'm curious if there are any JIRAS around making dataframes
support ordering better? there are a lot of operations that can be
optimized if you know that you have a total ordering on your data...are
there any plans, or at least JIRAS, around having the catalyst optimizer
handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky :

> Thanks for the response.
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
> Yes, this is supported in
>
>- Hive with bucket join
>- Pig with USING "merge"
>
>- MR with CompositeInputFormat
>
> But I guess it's not supported in Spark?
>
> On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao  wrote:
>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
>> requires the records with the identical join keys MUST BE shuffled to the
>> same “reducer” node / task, hashpartitioning is just a strategy to tell
>> spark shuffle service how to achieve that, in theory, we even can use the
>> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
>> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
>> about the shuffle strategy so much if it satisfies the demand on data
>> distribution.
>>
>>
>>
>> 2) If both datasets have already been previously partitioned/sorted the
>> same and stored on the file system (e.g. in a previous job), is there a way
>> to tell Spark this so that it won't want to do a "hashpartitioning" on
>> them? It looks like Spark just considers datasets that have been just read
>> from the the file system to have UnknownPartitioning. In the example below,
>> I try to join a dataframe to itself, and it still wants to hash repartition.
>>
>>
>>
>> [Hao:] Take this as example:
>>
>>
>>
>> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
>> a.key=b.key JOIN src c ON b.key=c.key
>>
>>
>>
>> == Physical Plan ==
>>
>> TungstenProject [value#20,value#22,value#24]
>>
>> SortMergeJoin [key#21], [key#23]
>>
>>   TungstenSort [key#21 ASC], false, 0
>>
>>TungstenProject [key#21,value#22,value#20]
>>
>> SortMergeJoin [key#19], [key#21]
>>
>>  TungstenSort [key#19 ASC], false, 0
>>
>>   TungstenExchange hashpartitioning(key#19,200)
>>
>>ConvertToUnsafe
>>
>> HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
>> Some(a))
>>
>>  TungstenSort [key#21 ASC], false, 0
>>
>>   TungstenExchange hashpartitioning(key#21,200)
>>
>>ConvertToUnsafe
>>
>> HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
>> Some(b))
>>
>>   TungstenSort [key#23 ASC], false, 0
>>
>>TungstenExchange hashpartitioning(key#23,200)
>>
>> ConvertToUnsafe
>>
>>  HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
>> Some(c))
>>
>>
>>
>> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
>> src b ON a.key=b.key”, as we didn’t change the data distribution after
>> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
>> which only require the table “c” for repartitioning on “key”.
>>
>>
>>
>> Taking the file system based data source as “UnknownPartitioning”, will
>> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
>> from different data sets with the identical join keys will be loaded by the
>> same node/task , since lots of factors need to be considered, like task
>> pool size, cluster size, source format, storage, data locality etc.,.
>>
>> I’ll agree it’s worth to optimize it for performance concerns, and
>> actually in Hive, it is called bucket join. I am not sure will that happens
>> soon in Spark SQL.
>>
>>
>>
>> Hao
>>
>>
>>
>> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
>> *Sent:* Monday, November 2, 2015 11:29 AM
>> *To:* user
>> *Subject:* Sort Merge Join
>>
>>
>>
>> Hi,
>>
>>
>>
>> I'm trying to understand SortMergeJoin (SPARK-2213).
>>
>>
>>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> CODE:
>>
>>  

Time-series prediction using spark

2015-11-02 Thread Cui Lin
Hello, all,

I am wondering if anyone tried time series prediction using spark?
Any good practice to suggest me? Thanks a lot!



-- 
Best regards!

Lin,Cui


Re: PySpark + Streaming + DataFrames

2015-11-02 Thread Jason White
This should be resolved with
https://github.com/apache/spark/commit/f92f334ca47c03b980b06cf300aa652d0ffa1880.
The conversion no longer does a `.take` when converting from RDD -> DF.


On Mon, Oct 19, 2015 at 6:30 PM, Tathagata Das  wrote:

> Yes, precisely! Also, for other folks who may read this, could reply back
> with the trusted conversion that worked for you (for a clear solution)?
>
> TD
>
>
> On Mon, Oct 19, 2015 at 3:08 PM, Jason White 
> wrote:
>
>> Ah, that makes sense then, thanks TD.
>>
>> The conversion from RDD -> DF involves a `.take(10)` in PySpark, even if
>> you provide the schema, so I was avoiding back-and-forth conversions. I’ll
>> see if I can create a ‘trusted’ conversion that doesn’t involve the `take`.
>>
>> --
>> Jason
>>
>> On October 19, 2015 at 5:23:59 PM, Tathagata Das (t...@databricks.com)
>> wrote:
>>
>> RDD and DF are not compatible data types. So you cannot return a DF when
>> you have to return an RDD. What rather you can do is return the underlying
>> RDD of the dataframe by dataframe.rdd().
>>
>>
>> On Fri, Oct 16, 2015 at 12:07 PM, Jason White 
>> wrote:
>>
>>> Hi Ken, thanks for replying.
>>>
>>> Unless I'm misunderstanding something, I don't believe that's correct.
>>> Dstream.transform() accepts a single argument, func. func should be a
>>> function that accepts a single RDD, and returns a single RDD. That's what
>>> transform_to_df does, except the RDD it returns is a DF.
>>>
>>> I've used Dstream.transform() successfully in the past when transforming
>>> RDDs, so I don't think my problem is there.
>>>
>>> I haven't tried this in Scala yet, and all of the examples I've seen on
>>> the
>>> website seem to use foreach instead of transform. Does this approach
>>> work in
>>> Scala?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Where does mllib's .save method save a model to?

2015-11-02 Thread apu mishra . rr
I want to save an mllib model to disk, and am trying the model.save
operation as described in
http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#examples:

model.save(sc, "myModelPath")

But after running it, I am unable to find any newly created file or
dir by the name "myModelPath" in any obvious places. Any ideas where
it might lie?

Thanks, -Apu

To reproduce:

# In PySpark, create ALS or other mllib model, then
model.save(sc, "myModelPath")
# In Unix environment, try to find "myModelPath"

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Standalone cluster not using multiple workers for single application

2015-11-02 Thread Jeff Jones
I’ve got an a series of applications using a single standalone Spark cluster 
(v1.4.1).  The cluster has 1 master and 4 workers (4 CPUs per worker node). I 
am using the start-slave.sh script to launch the worker process on each node 
and I can see the nodes were successfully registered using the SparkUI.  When I 
launch one of my applications regardless of what I set spark.cores.max to when 
instantiating the SparkContext in the driver app I seem to get a single worker 
assigned to the application and all jobs that get run.  For example, if I set 
spark.cores.max to 16 the SparkUI will show a single worker take the load with 
4 (16 Used) in the Cores column.  How do I get my jobs run across multiple 
nodes in the cluster?

Here’s a snippet from the SparkUI (IP addresses removed for privacy)

Workers
Worker Id   Address State   Cores   Memory
worker-20150920064814-***-33659 ***:33659   ALIVE   4 (0 Used)  28.0 GB 
(0.0 B Used)
worker-20151012175609-***37399  ***:37399   ALIVE   4 (16 Used) 28.0 GB 
(28.0 GB Used)
worker-20151012181934-***-36573 ***:36573   ALIVE   4 (4 Used)  28.0 GB 
(28.0 GB Used)
worker-20151030170514-***-45368 ***:45368   ALIVE   4 (0 Used)  28.0 GB 
(0.0 B Used)
Running Applications
Application ID  NameCores   Memory per Node Submitted Time  UserState   
Duration
app-20151102194733-0278 App116  28.0 GB 2015/11/02 19:47:33 *** 
RUNNING 2 s
app-20151102164156-0274 App24   28.0 GB 2015/11/02 16:41:56 *** 
RUNNING 3.1 h
Jeff


This message (and any attachments) is intended only for the designated 
recipient(s). It
may contain confidential or proprietary information, or have other limitations 
on use as
indicated by the sender. If you are not a designated recipient, you may not 
review, use,
copy or distribute this message. If you received this in error, please notify 
the sender by
reply e-mail and delete this message.


Re: Spark SQL lag() window function, strange behavior

2015-11-02 Thread Yin Huai
Hi Ross,

What version of spark are you using? There were two issues that affected
the results of window function in Spark 1.5 branch. Both of issues have
been fixed and will be released with Spark 1.5.2 (this release will happen
soon). For more details of these two issues, you can take a look at
https://issues.apache.org/jira/browse/SPARK-11135 and
https://issues.apache.org/jira/browse/SPARK-11009.

Thanks,

Yin

On Mon, Nov 2, 2015 at 12:07 PM,  wrote:

> Hello Spark community -
> I am running a Spark SQL query to calculate the difference in time between
> consecutive events, using lag(event_time) over window -
>
> SELECT device_id,
>unix_time,
>event_id,
>unix_time - lag(unix_time)
>   OVER
> (PARTITION BY device_id ORDER BY unix_time,event_id)
>  AS seconds_since_last_event
> FROM ios_d_events;
>
> This is giving me some strange results in the case where the first two
> events for a particular device_id have the same timestamp.
> I used to following query to take a look at what value was being returned
> by lag():
>
> SELECT device_id,
>event_time,
>unix_time,
>event_id,
>lag(event_time) OVER (PARTITION BY device_id ORDER BY 
> unix_time,event_id) AS lag_time
> FROM ios_d_events;
>
> I’m seeing that in these cases, I am getting something like 1970-01-03 …
> instead of a null value, and the following lag times are all following the
> same format.
>
> I posted a section of this output in this SO question:
> http://stackoverflow.com/questions/33482167/spark-sql-window-function-lag-giving-unexpected-resutls
>
> The errant results are labeled with device_id 999.
>
> Any idea why this is occurring?
>
> - Ross
>


Dump table into file

2015-11-02 Thread Shepherd
Hi all, I have one table called "result" in the database, for example:
/user/hive/warehouse/data_result.db/result

How do I export the table "result" into a local csv file?
Thanks a lot.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dump-table-into-file-tp25252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



kinesis batches hang after YARN automatic driver restart

2015-11-02 Thread Hster Geguri
Hello Wonderful Sparks Peoples,

We are testing AWS Kinesis/Spark Streaming (1.5) failover behavior with
Hadoop/Yarn 2.6 and 2.71 and want to understand expected behavior.

When I manually kill a yarn application master/driver with a linux kill -9,
YARN will automatically relaunch another master that successfully reads in
the previous checkpoint.

However- more than half the time, the kinesis executors (5 second batches)
don't continue processing immediately.  I.e. batches of 0 events are queued
for  5-9 minutes before it starts reprocessing the stream again. When I
drill down to the current job which is hanging- it shows all stages/tasks
are complete. I would expect the automatically relaunched behavior to be
similar to as if I had manually done a resubmit with spark-submit where the
stream processing continues within a minute of launch.

Any input is highly appreciated.

Thanks much,
Heji


Fwd: Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-11-02 Thread Babar Tareen
Resending, haven't found a workaround. Any help is highly appreciated.

-- Forwarded message --
From: Babar Tareen 
Date: Thu, Oct 22, 2015 at 2:47 PM
Subject: Getting ClassNotFoundException: scala.Some on Spark 1.5.x
To: user@spark.apache.org


Hi,

I am getting following exception when submitting a job to Spark 1.5.x from
Scala. The same code works with Spark 1.4.1. Any clues as to what might
causing the exception.



*Code:App.scala*import org.apache.spark.SparkContext

object App {
  def main(args: Array[String]) = {
val l = List(1,2,3,4,5,6,7,8,9,0)
val sc = new SparkContext("local[4]", "soark-test")
val rdd = sc.parallelize(l)
rdd.foreach(println)
println(rdd.collect())
  }
}

*build.sbt*
lazy val sparkjob = (project in file("."))
  .settings(
name := "SparkJob",
version := "1.0",
scalaVersion := "2.11.6",
libraryDependencies := libs
)

lazy val libs = Seq(
  "org.apache.spark" %% "spark-core" % "1.5.1"
)


*Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach at
app.scala:9, took 0.689832 s
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
java.lang.ClassNotFoundException: scala.Some
[error] at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
[error] at
org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:497)
[error] at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
[error] at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
[error] at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
[error] at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
[error] at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
[error] at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
[error] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
[error] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[error] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[error] at java.lang.Thread.run(Thread.java:745)
[error] Caused by: java.lang.ClassNotFoundException: scala.Some
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error] at java.lang.Class.forName0(Native Method)
[error] at java.lang.Class.forName(Class.java:348)
[error] at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
[error] at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
[error] at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
[error] at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
[error] at
org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
[error] at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
[error] ... 24 more
[error]
[error] Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage
0.0 (TID 2, localhost): java.io.IOException:
java.lang.ClassNotFoundException: scala.Some

How to handle Option[Int] in dataframe

2015-11-02 Thread manas kar
Hi,
 I have a case class with many columns that are Option[Int] or
Option[Array[Byte]] and such.
 I would like to save it to parquet file and later read it back to my case
class too.
 I found that Option[Int] when null returns 0 when the field is Null.
 My question:
 Is there a way to get Option[Int] from a row instead of Int from a
dataframe?

...Manas

Some more description

/*My case class*/
case class Student(name: String, age: Option[Int])

val s = new Student("Manas",Some(35))
val s1 = new Student("Manas1",None)
val student =sc.makeRDD(List(s,s1)).toDF

/*Now writing the dataframe*/
student.write.parquet("/tmp/t1")

/*Lets read it back*/
val st1 = sqlContext.read.parquet("/tmp/t1")
st1.show

+--++
|  name| age|
+--++
| Manas|  35|
|Manas1|null|
+--++

But now I want to cast my dataframe to the dataframe[Student]. What is the
easiest way to do it?

..Manas


SparkSQL implicit conversion on insert

2015-11-02 Thread Bryan Jeffrey
All,

I have an object Joda DateTime fields. I would prefer to continue to use
the DateTime in my application. When I am inserting into Hive I need to
cast to a Timestamp field (DateTime is not supported).  I added an implicit
conversion from DateTime to Timestamp - but it does not appear to be called
when inserting into the temp table. I am seeing the following error:

java.lang.UnsupportedOperationException: Schema for type
org.joda.time.DateTime is not supported

Is there some magic I need to use to get the implicit conversion when
inserting into Hive, or am I required to do an explicit conversion prior to
insertion?

Regards,

Bryan Jeffrey


Spark SQL lag() window function, strange behavior

2015-11-02 Thread Ross.Cramblit
Hello Spark community -
I am running a Spark SQL query to calculate the difference in time between 
consecutive events, using lag(event_time) over window -


SELECT device_id,
   unix_time,
   event_id,
   unix_time - lag(unix_time)
  OVER
(PARTITION BY device_id ORDER BY unix_time,event_id)
 AS seconds_since_last_event
FROM ios_d_events;

This is giving me some strange results in the case where the first two events 
for a particular device_id have the same timestamp.
I used to following query to take a look at what value was being returned by 
lag():

SELECT device_id,
   event_time,
   unix_time,
   event_id,
   lag(event_time) OVER (PARTITION BY device_id ORDER BY 
unix_time,event_id) AS lag_time
FROM ios_d_events;

I’m seeing that in these cases, I am getting something like 1970-01-03 … 
instead of a null value, and the following lag times are all following the same 
format.

I posted a section of this output in this SO question: 
http://stackoverflow.com/questions/33482167/spark-sql-window-function-lag-giving-unexpected-resutls

The errant results are labeled with device_id 999.

Any idea why this is occurring?

- Ross


Re: Spark SQL lag() window function, strange behavior

2015-11-02 Thread Ross.Cramblit
I am using Spark 1.5.0 on Yarn

On Nov 2, 2015, at 3:16 PM, Yin Huai 
> wrote:

Hi Ross,

What version of spark are you using? There were two issues that affected the 
results of window function in Spark 1.5 branch. Both of issues have been fixed 
and will be released with Spark 1.5.2 (this release will happen soon). For more 
details of these two issues, you can take a look at 
https://issues.apache.org/jira/browse/SPARK-11135
 and 
https://issues.apache.org/jira/browse/SPARK-11009.

Thanks,

Yin

On Mon, Nov 2, 2015 at 12:07 PM, 
> 
wrote:
Hello Spark community -
I am running a Spark SQL query to calculate the difference in time between 
consecutive events, using lag(event_time) over window -


SELECT device_id,
   unix_time,
   event_id,
   unix_time - lag(unix_time)
  OVER
(PARTITION BY device_id ORDER BY unix_time,event_id)
 AS seconds_since_last_event
FROM ios_d_events;

This is giving me some strange results in the case where the first two events 
for a particular device_id have the same timestamp.
I used to following query to take a look at what value was being returned by 
lag():

SELECT device_id,
   event_time,
   unix_time,
   event_id,
   lag(event_time) OVER (PARTITION BY device_id ORDER BY 
unix_time,event_id) AS lag_time
FROM ios_d_events;

I’m seeing that in these cases, I am getting something like 1970-01-03 … 
instead of a null value, and the following lag times are all following the same 
format.

I posted a section of this output in this SO question: 
http://stackoverflow.com/questions/33482167/spark-sql-window-function-lag-giving-unexpected-resutls

The errant results are labeled with device_id 999.

Any idea why this is occurring?

- Ross




Re: Standalone cluster not using multiple workers for single application

2015-11-02 Thread Jean-Baptiste Onofré

Hi Jeff,

it may depend of your application code.

To verify your setup and if your are able to scale on multiple worker, 
you can try using the SparkTC example for instance (it should use all 
workers).


Regards
JB

On 11/02/2015 08:56 PM, Jeff Jones wrote:

I’ve got an a series of applications using a single standalone Spark
cluster (v1.4.1).  The cluster has 1 master and 4 workers (4 CPUs per
worker node). I am using the start-slave.sh script to launch the worker
process on each node and I can see the nodes were successfully
registered using the SparkUI.  When I launch one of my applications
regardless of what I set spark.cores.max to when instantiating the
SparkContext in the driver app I seem to get a single worker assigned to
the application and all jobs that get run.  For example, if I set
spark.cores.max to 16 the SparkUI will show a single worker take the
load with 4 (16 Used) in the Cores column.  How do I get my jobs run
across multiple nodes in the cluster?

Here’s a snippet from the SparkUI (IP addresses removed for privacy)


Workers

Worker Id   Address State   Cores   Memory
worker-20150920064814-***-33659 ***:33659   ALIVE   4 (0 Used)  
28.0 GB
(0.0 B Used)
worker-20151012175609-***37399  ***:37399   ALIVE   4 (16 Used) 28.0 GB
(28.0 GB Used)
worker-20151012181934-***-36573 ***:36573   ALIVE   4 (4 Used)  
28.0 GB
(28.0 GB Used)
worker-20151030170514-***-45368 ***:45368   ALIVE   4 (0 Used)  
28.0 GB
(0.0 B Used)


Running Applications

Application ID  NameCores   Memory per Node Submitted Time  User
State   Duration
app-20151102194733-0278 App116  28.0 GB 2015/11/02 
19:47:33 ***
RUNNING 2 s
app-20151102164156-0274 App24   28.0 GB 2015/11/02 
16:41:56 ***
RUNNING 3.1 h

Jeff


This message (and any attachments) is intended only for the designated
recipient(s). It
may contain confidential or proprietary information, or have other
limitations on use as
indicated by the sender. If you are not a designated recipient, you may
not review, use,
copy or distribute this message. If you received this in error, please
notify the sender by
reply e-mail and delete this message.


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to lookup by a key in an RDD

2015-11-02 Thread Deenar Toraskar
Swetha

Currently IndexedRDD is an external library and not part of Spark Core. You
can use it by adding a dependency and pull it in. There are plans to move
it to Spark core tracked in https://issues.apache.org/jira/browse/SPARK-2365.
See
https://spark-summit.org/2015/events/indexedrdd-efficient-fine-grained-updates-for-rdds/
and https://github.com/amplab/spark-indexedrdd

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 2 November 2015 at 23:29, Ted Yu  wrote:

> Please take a look at SPARK-2365
>
> On Mon, Nov 2, 2015 at 3:25 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> Is Indexed RDDs released yet?
>>
>> Thanks,
>> Swetha
>>
>> On Sun, Nov 1, 2015 at 1:21 AM, Gylfi  wrote:
>>
>>> Hi.
>>>
>>> You may want to look into Indexed RDDs
>>> https://github.com/amplab/spark-indexedrdd
>>>
>>> Regards,
>>> Gylfi.
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Submitting Spark Applications - Do I need to leave ports open?

2015-11-02 Thread Akhil Das
Yes you need to open up a few ports for that to happen, have a look at
http://spark.apache.org/docs/latest/configuration.html#networking you can
see *.port configurations which bounds to random by default, fix those
ports to a specific number and open those ports in your firewall and it
should work.

Thanks
Best Regards

On Tue, Oct 27, 2015 at 2:05 AM, markluk  wrote:

> I want to submit interactive applications to a remote Spark cluster running
> in standalone mode.
>
> I understand I need to connect to master's 7077 port. It also seems like
> the
> master node need to open connections to my local machine. And the ports
> that
> it needs to open are different every time.
>
> If I have firewall enabled on my local machine, spark-submit fails since
> the
> ports it needs to open on my local machine are unreachable, so spark-submit
> fails to connect to the master.
>
> I was able to get it to work if i disable firewall on my local machine. But
> that's not a real solution.
>
> Is there some config that I'm not aware of that solves this problem?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-Do-I-need-to-leave-ports-open-tp25207.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: execute native system commands in Spark

2015-11-02 Thread Deenar Toraskar
You can do the following, make sure you the no of executors requested equal
the number of executors on your cluster.

import scala.sys.process._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
sc.parallelize(0 to 10).map { _ =>(("hostname".!!).trim,
UserGroupInformation.getCurrentUser.toString)}.collect.distinct

Regards
Deenar
*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812




On 2 November 2015 at 15:38, Adrian Tanase  wrote:

> Have you seen .pipe()?
>
>
>
>
> On 11/2/15, 5:36 PM, "patcharee"  wrote:
>
> >Hi,
> >
> >Is it possible to execute native system commands (in parallel) Spark,
> >like scala.sys.process ?
> >
> >Best,
> >Patcharee
> >
> >-
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Yarn] How to set user in ContainerLaunchContext?

2015-11-02 Thread Peter Rudenko
Hi, i have an ApplicationMaster which accepts requests and launches 
container on which it launches spark-submit --master yarn. In request i 
have a field "username" - the user i want to laucnh a job from. How can 
i set a user which will be run conmmand on a container? Currently they 
all running as yarn user even though AM is running as a root user.


Here's my code:


private def setupTokens(username:String): ByteBuffer = {
  val credentials = UserGroupInformation.createProxyUser(username, 
UserGroupInformation.getCurrentUser).getCredentials
  val dob =new DataOutputBuffer();
  credentials.writeTokenStorageToStream(dob);
  ByteBuffer.wrap(dob.getData(),0, dob.getLength()).duplicate();
}

val cCLC = Records.newRecord(classOf[ContainerLaunchContext])

cCLC.setCommands(List("spark-submit --master yarn ..."))

cCLC.setTokens(setupTokens(user))

Thanks, Peter Rudenko


Re: Why does sortByKey() transformation trigger a job in spark-shell?

2015-11-02 Thread Mark Hamstra
Hah!  No, that is not a "starter" issue.  It touches on some fairly deep
Spark architecture, and there have already been a few attempts to resolve
the issue -- none entirely satisfactory, but you should definitely search
out the work that has already been done.

On Mon, Nov 2, 2015 at 5:51 AM, Jacek Laskowski  wrote:

> Hi,
>
> Answering my own question after...searching sortByKey in the mailing
> list archives and later in JIRA.
>
> It turns out it's a known issue and filed under
> https://issues.apache.org/jira/browse/SPARK-1021 "sortByKey() launches
> a cluster job when it shouldn't".
>
> It's labelled "starter" that should not be that hard to fix. Does this
> still hold? I'd like to work on it if it's "simple" and doesn't get me
> swamped. Thanks!
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
>
> On Mon, Nov 2, 2015 at 2:34 PM, Jacek Laskowski  wrote:
> > Hi Sparkians,
> >
> > I use the latest Spark 1.6.0-SNAPSHOT in spark-shell with the default
> > local[*] master.
> >
> > I created an RDD of pairs using the following snippet:
> >
> > val rdd = sc.parallelize(0 to 5).map(n => (n, util.Random.nextBoolean))
> >
> > It's all fine so far. The map transformation causes no computation.
> >
> > I thought all transformations are lazy and trigger no job until an
> > action's called. It seems I was wrong with sortByKey()! When I called
> > `rdd.sortByKey()`, it started a job: sortByKey at :27 (!)
> >
> > Can anyone explain what makes for the different behaviour of sortByKey
> > since it is a transformation and hence should be lazy? Is this a
> > special transformation?
> >
> > Pozdrawiam,
> > Jacek
> >
> > --
> > Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> > Follow me at https://twitter.com/jaceklaskowski
> > Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Thúy Hằng Lê
Hi Andrian,

Thanks for the information.

However your 2 suggestions couldn't really work for me.

Accuracy is the most important aspect in my application. So keeping only 15
minutes window stats or prune out some of keys is impossible for my
application.

I can change the checking point interval as your suggestion,
however is there any other Spark configuration to turning the data
checkpoint performance?

And just curious, technically why updateStateByKey need to be called for
very key (regardless the new occurrences or not)? Does Spark has any plan
to fix it?
I have 4M keys need to maintain the statistics however only few of them are
changed in each batch interval.

2015-11-02 22:37 GMT+07:00 Adrian Tanase :

> You are correct, the default checkpointing interval is 10 seconds or your
> batch size, whichever is bigger. You can change it by calling
> .checkpoint(x) on your resulting Dstream.
>
> For the rest, you are probably keeping an “all time” word count that grows
> unbounded if you never remove words from the map. Keep in mind that
> updateStateByKey is called for every key in the state RDD, regardless if
> you have new occurrences or not.
>
> You should consider at least one of these strategies:
>
>- run your word count on a windowed Dstream (e.g. Unique counts over
>the last 15 minutes)
>   - Your best bet here is reduceByKeyAndWindow with an inverse
>   function
>- Make your state object more complicated and try to prune out words
>with very few occurrences or that haven’t been updated for a long time
>   - You can do this by emitting None from updateStateByKey
>
> Hope this helps,
> -adrian
>
> From: Thúy Hằng Lê
> Date: Monday, November 2, 2015 at 7:20 AM
> To: "user@spark.apache.org"
> Subject: Spark Streaming data checkpoint performance
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(2));
>


Re: Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-11-02 Thread Jonathan Coveney
Caused by: java.lang.ClassNotFoundException: scala.Some

indicates that you don't have the scala libs present. How are you executing
this? My guess is the issue is a conflict between scala 2.11.6 in your
build and 2.11.7? Not sure...try setting your scala to 2.11.7?

But really, first it'd be good to see what command you're using to invoke
this.

2015-11-02 14:48 GMT-05:00 Babar Tareen :

> Resending, haven't found a workaround. Any help is highly appreciated.
>
> -- Forwarded message --
> From: Babar Tareen 
> Date: Thu, Oct 22, 2015 at 2:47 PM
> Subject: Getting ClassNotFoundException: scala.Some on Spark 1.5.x
> To: user@spark.apache.org
>
>
> Hi,
>
> I am getting following exception when submitting a job to Spark 1.5.x from
> Scala. The same code works with Spark 1.4.1. Any clues as to what might
> causing the exception.
>
>
>
> *Code:App.scala*import org.apache.spark.SparkContext
>
> object App {
>   def main(args: Array[String]) = {
> val l = List(1,2,3,4,5,6,7,8,9,0)
> val sc = new SparkContext("local[4]", "soark-test")
> val rdd = sc.parallelize(l)
> rdd.foreach(println)
> println(rdd.collect())
>   }
> }
>
> *build.sbt*
> lazy val sparkjob = (project in file("."))
>   .settings(
> name := "SparkJob",
> version := "1.0",
> scalaVersion := "2.11.6",
> libraryDependencies := libs
> )
>
> lazy val libs = Seq(
>   "org.apache.spark" %% "spark-core" % "1.5.1"
> )
>
>
> *Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach at
> app.scala:9, took 0.689832 s
> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
> stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
> Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
> java.lang.ClassNotFoundException: scala.Some
> [error] at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> [error] at
> org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
> [error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [error] at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> [error] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> [error] at java.lang.reflect.Method.invoke(Method.java:497)
> [error] at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> [error] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
> [error] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> [error] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> [error] at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> [error] at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> [error] at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> [error] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [error] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [error] at java.lang.Thread.run(Thread.java:745)
> [error] Caused by: java.lang.ClassNotFoundException: scala.Some
> [error] at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> [error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> [error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> [error] at java.lang.Class.forName0(Native Method)
> [error] at java.lang.Class.forName(Class.java:348)
> [error] at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> [error] at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> [error] at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> 

Re: --jars option using hdfs jars cannot effect when spark standlone deploymode with cluster

2015-11-02 Thread Akhil Das
Can you give a try putting the jar locally without hdfs?

Thanks
Best Regards

On Wed, Oct 28, 2015 at 8:40 AM, our...@cnsuning.com 
wrote:

> hi all,
>when using command:
> spark-submit *--deploy-mode cluster --jars
> hdfs:///user/spark/cypher.jar* --class
> com.suning.spark.jdbc.MysqlJdbcTest hdfs:///user/spark/MysqlJdbcTest.jar
> the program throw exception that  cannot find class in *cypher.jar, *the
> driver log show no --jars download with  cluster mode.  Isn't it  only
> use fatjar?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Ricky  Ou(欧   锐)
>
>
>


Split RDD into multiple RDDs using filter-transformation

2015-11-02 Thread Sushrut Ikhar
Hi,
I need to split a RDD into 3 different RDD using filter-transformation.
I have cached the original RDD before using filter.
The input is lopsided leaving some executors with heavy load while others
with less; so I have repartitioned it.

*DAG-lineage I expected:*

I/P RDD  -->  MAP RDD --> SHUFFLE RDD (repartition) -->

*MAP RDD (cache)* --> FILTER RDD1 --> MAP1 --> UNION RDD --> O/P RDD
   --> FILTER RDD2 --> MAP2
   --> FILTER RDD3 --> MAP3

*DAG-lineage I observed:*

I/P RDD  -->  MAP RDD -->

SHUFFLE RDD (repartition) --> *MAP RDD (cache)* --> FILTER RDD1 --> MAP1
SHUFFLE RDD (repartition) --> *MAP RDD (cache)* --> FILTER RDD2 --> MAP2
SHUFFLE RDD (repartition) --> *MAP RDD (cache)* --> FILTER RDD3 --> MAP3 -->

UNION RDD --> O/P RDD

Also I Spark-UI shows that no RDD partitioned are actually being cached.

How do I split then without shuffling thrice?
Regards,

Sushrut Ikhar
[image: https://]about.me/sushrutikhar



Re: Programatically create RDDs based on input

2015-11-02 Thread amit tewari
Thanks Natu, Ayan.

I was able to create an array of Dataframes (Spark 1.3+).

DataFrame[] dfs = new DataFrame[uniqueFileIds.length];

Thanks
Amit

On Sun, Nov 1, 2015 at 10:58 AM, Natu Lauchande 
wrote:

> Hi Amit,
>
> I don't see any default constructor in the JavaRDD docs
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaRDD.html
> .
>
> Have you tried the following ?
>
> JavaRDD jRDD[] ;
>
> jRDD.add( jsc.textFile("/file1.txt") )
> jRDD.add( jsc.textFile("/file2.txt") )
> ..
> ;
>
> Natu
>
>
> On Sat, Oct 31, 2015 at 11:18 PM, ayan guha  wrote:
>
>> My java knowledge is limited, but you may try with a hashmap and put RDDs
>> in it?
>>
>> On Sun, Nov 1, 2015 at 4:34 AM, amit tewari 
>> wrote:
>>
>>> Thanks Ayan thats something similar to what I am looking at but trying
>>> the same in Java is giving compile error:
>>>
>>> JavaRDD jRDD[] = new JavaRDD[3];
>>>
>>> //Error: Cannot create a generic array of JavaRDD
>>>
>>> Thanks
>>> Amit
>>>
>>>
>>>
>>> On Sat, Oct 31, 2015 at 5:46 PM, ayan guha  wrote:
>>>
 Corrected a typo...

 # In Driver
 fileList=["/file1.txt","/file2.txt"]
 rdds = []
 for f in fileList:
  rdd = jsc.textFile(f)
  rdds.append(rdd)


 On Sat, Oct 31, 2015 at 11:14 PM, ayan guha 
 wrote:

> Yes, this can be done. quick python equivalent:
>
> # In Driver
> fileList=["/file1.txt","/file2.txt"]
> rdd = []
> for f in fileList:
>  rdd = jsc.textFile(f)
>  rdds.append(rdd)
>
>
>
> On Sat, Oct 31, 2015 at 11:09 PM, amit tewari 
> wrote:
>
>> Hi
>>
>> I need the ability to be able to create RDDs programatically inside
>> my program (e.g. based on varaible number of input files).
>>
>> Can this be done?
>>
>> I need this as I want to run the following statement inside an
>> iteration:
>>
>> JavaRDD rdd1 = jsc.textFile("/file1.txt");
>>
>> Thanks
>> Amit
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>



 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
-- Forwarded message --
From: "Balachandar R.A." 
Date: 02-Nov-2015 12:53 pm
Subject: Re: Error : - No filesystem for scheme: spark
To: "Jean-Baptiste Onofré" 
Cc:

> HI JB,
> Thanks for the response,
> Here is the content of my spark-defaults.conf
>
>
> # Default system properties included when running spark-submit.
> # This is useful for setting default environmental settings.
>
> # Example:
>  spark.master spark://fdoat:7077
> # spark.eventLog.enabled   true
>  spark.eventLog.dir/home/bala/spark-logs
> # spark.eventLog.dir   hdfs://namenode:8021/directory
> # spark.serializer
org.apache.spark.serializer.KryoSerializer
> # spark.driver.memory  5g
> # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
-Dnumbers="one two three"
>
>
> regards
> Bala
>
> On 2 November 2015 at 12:21, Jean-Baptiste Onofré  wrote:
>>
>> Hi,
>>
>> do you have something special in conf/spark-defaults.conf (especially on
the eventLog directory) ?
>>
>> Regards
>> JB
>>
>>
>> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>>>
>>> Can someone tell me at what point this error could come?
>>>
>>> In one of my use cases, I am trying to use hadoop custom input format.
>>> Here is my code.
>>>
>>> |valhConf:Configuration=sc.hadoopConfiguration
>>>
hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>>>
=newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>>>
=newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>>>
>>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
>>> below error in my spark-submit launch|
>>>
>>> |
>>> |
>>>
>>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
>>> 0.0(TID 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
>>> at
>>>
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
>>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
>>>
>>> Any help here to move towards fixing this will be of great help
>>>
>>>
>>>
>>> Thanks
>>>
>>> Bala
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Jean-Baptiste Onofré

Just to be sure: you use yarn cluster (not standalone), right ?

Regards
JB

On 11/02/2015 10:37 AM, Balachandar R.A. wrote:

Yes. In two different places I use spark://

1. In my code, while creating spark configuration, I use the code below

val sConf = new
SparkConf().setAppName("Dummy").setMaster("spark://:7077")
val sConf = val sc = new SparkContext(sConf)


2. I run the job using the command below

spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
myjob.jar

regards
Bala


On 2 November 2015 at 14:59, Romi Kuntsman > wrote:

except "spark.master", do you have "spark://" anywhere in your code
or config files?

*Romi Kuntsman*, /Big Data Engineer/_
_
http://www.totango.com 

On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A.
> wrote:


-- Forwarded message --
From: "Balachandar R.A." >
Date: 02-Nov-2015 12:53 pm
Subject: Re: Error : - No filesystem for scheme: spark
To: "Jean-Baptiste Onofré" >
Cc:

 > HI JB,
 > Thanks for the response,
 > Here is the content of my spark-defaults.conf
 >
 >
 > # Default system properties included when running spark-submit.
 > # This is useful for setting default environmental settings.
 >
 > # Example:
 >  spark.master spark://fdoat:7077
 > # spark.eventLog.enabled   true
 >  spark.eventLog.dir/home/bala/spark-logs
 > # spark.eventLog.dir   hdfs://namenode:8021/directory
 > # spark.serializer
org.apache.spark.serializer.KryoSerializer
 > # spark.driver.memory  5g
 > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails
-Dkey=value -Dnumbers="one two three"
 >
 >
 > regards
 > Bala


 >
 > On 2 November 2015 at 12:21, Jean-Baptiste Onofré
> wrote:
 >>
 >> Hi,
 >>
 >> do you have something special in conf/spark-defaults.conf
(especially on the eventLog directory) ?
 >>
 >> Regards
 >> JB
 >>
 >>
 >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
 >>>
 >>> Can someone tell me at what point this error could come?
 >>>
 >>> In one of my use cases, I am trying to use hadoop custom
input format.
 >>> Here is my code.
 >>>
 >>> |valhConf:Configuration=sc.hadoopConfiguration
 >>>

hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
 >>>

=newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
 >>>

=newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
 >>>
=hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
 >>>
 >>> |The moment I invoke mapPartitionsWithInputSplit() method,
I get the
 >>> below error in my spark-submit launch|
 >>>
 >>> |
 >>> |
 >>>
 >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask
0.0in stage
 >>> 0.0(TID
0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
 >>> at
 >>>

org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
 >>>
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
 >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
 >>>
 >>> Any help here to move towards fixing this will be of great help
 >>>
 >>>
 >>>
 >>> Thanks
 >>>
 >>> Bala
 >>>
 >>
 >> --
 >> Jean-Baptiste Onofré
 >> jbono...@apache.org 
 >> http://blog.nanthrax.net
 >> Talend - http://www.talend.com
 >>
 >>
-
 >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

 >> For additional commands, e-mail: user-h...@spark.apache.org

 >>
 >





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
No.. I am not using yarn. Yarn is not running in my cluster. So, it is
standalone one.

Regards
Bala
On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré"  wrote:

> Just to be sure: you use yarn cluster (not standalone), right ?
>
> Regards
> JB
>
> On 11/02/2015 10:37 AM, Balachandar R.A. wrote:
>
>> Yes. In two different places I use spark://
>>
>> 1. In my code, while creating spark configuration, I use the code below
>>
>> val sConf = new
>> SparkConf().setAppName("Dummy").setMaster("spark://:7077")
>> val sConf = val sc = new SparkContext(sConf)
>>
>>
>> 2. I run the job using the command below
>>
>> spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
>> myjob.jar
>>
>> regards
>> Bala
>>
>>
>> On 2 November 2015 at 14:59, Romi Kuntsman > > wrote:
>>
>> except "spark.master", do you have "spark://" anywhere in your code
>> or config files?
>>
>> *Romi Kuntsman*, /Big Data Engineer/_
>> _
>> http://www.totango.com 
>>
>> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A.
>> > wrote:
>>
>>
>> -- Forwarded message --
>> From: "Balachandar R.A." > >
>> Date: 02-Nov-2015 12:53 pm
>> Subject: Re: Error : - No filesystem for scheme: spark
>> To: "Jean-Baptiste Onofré" > >
>> Cc:
>>
>>  > HI JB,
>>  > Thanks for the response,
>>  > Here is the content of my spark-defaults.conf
>>  >
>>  >
>>  > # Default system properties included when running spark-submit.
>>  > # This is useful for setting default environmental settings.
>>  >
>>  > # Example:
>>  >  spark.master spark://fdoat:7077
>>  > # spark.eventLog.enabled   true
>>  >  spark.eventLog.dir/home/bala/spark-logs
>>  > # spark.eventLog.dir
>>  hdfs://namenode:8021/directory
>>  > # spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>>  > # spark.driver.memory  5g
>>  > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails
>> -Dkey=value -Dnumbers="one two three"
>>  >
>>  >
>>  > regards
>>  > Bala
>>
>>
>>  >
>>  > On 2 November 2015 at 12:21, Jean-Baptiste Onofré
>> > wrote:
>>  >>
>>  >> Hi,
>>  >>
>>  >> do you have something special in conf/spark-defaults.conf
>> (especially on the eventLog directory) ?
>>  >>
>>  >> Regards
>>  >> JB
>>  >>
>>  >>
>>  >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>>  >>>
>>  >>> Can someone tell me at what point this error could come?
>>  >>>
>>  >>> In one of my use cases, I am trying to use hadoop custom
>> input format.
>>  >>> Here is my code.
>>  >>>
>>  >>> |valhConf:Configuration=sc.hadoopConfiguration
>>  >>>
>>
>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>>  >>>
>>
>> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>>  >>>
>>
>> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>>  >>>
>>
>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>>  >>>
>>  >>> |The moment I invoke mapPartitionsWithInputSplit() method,
>> I get the
>>  >>> below error in my spark-submit launch|
>>  >>>
>>  >>> |
>>  >>> |
>>  >>>
>>  >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask
>> 0.0in stage
>>  >>> 0.0(TID
>> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
>>  >>> at
>>  >>>
>>
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
>>  >>>
>>
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
>>  >>>
>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
>>  >>>
>>  >>> Any help here to move towards fixing this will be of great
>> help
>>  >>>
>>  >>>
>>  >>>
>>  >>> Thanks
>>  >>>
>>  >>> Bala
>>  >>>
>>  >>
>>  >> --
>>  >> Jean-Baptiste Onofré
>>  >> jbono...@apache.org 
>>  >> http://blog.nanthrax.net
>>  >> Talend - 

Re: How to catch error during Spark job?

2015-11-02 Thread Akhil Das
Usually you add exception handling within the transformations, in your case
you have it added in the driver code. This approach won't be able to catch
those exceptions happening inside the executor.

eg:

try {
  val rdd = sc.parallelize(1 to 100)

  rdd.foreach(x => throw new Exception("Real failure!")) //This could
be rdd.map etc

  val count = rdd.count

  println(s"Count: $count")

  *throw new Exception("Fail!")*

} finally {
  sc.stop
}

Thanks
Best Regards

On Wed, Oct 28, 2015 at 7:10 AM, Isabelle Phan  wrote:

> Hello,
>
> I had a question about error handling in Spark job: if an exception occurs
> during the job, what is the best way to get notification of the failure?
> Can Spark jobs return with different exit codes?
>
> For example, I wrote a dummy Spark job just throwing out an Exception, as
> follows:
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
>
> object ExampleJob {
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("Test Job")
> val sc = new SparkContext(conf)
> try {
>   val count = sc.parallelize(1 to 100).count
>   println(s"Count: $count")
>
>   *throw new Exception("Fail!")*
>
> } finally {
>   sc.stop
> }
>   }
>
> }
>
> The spark-submit execution trace shows the error:
> spark-submit --class com.test.ExampleJob test.jar
> 15/10/03 03:13:16 INFO SparkContext: Running Spark version 1.4.0
> 15/10/03 03:13:19 WARN SparkConf: In Spark 1.0 and later spark.local.dir
> will be overridden by the value set by the cluster manager (via
> SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
> 15/10/03 03:13:19 WARN SparkConf:
> ...
> 15/10/03 03:13:59 INFO DAGScheduler: Job 0 finished: count at
> ExampleJob.scala:12, took 18.879104 s
> Count: 100
> 15/10/03 03:13:59 INFO SparkUI: Stopped Spark web UI at []
> 15/10/03 03:13:59 INFO DAGScheduler: Stopping DAGScheduler
> 15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Shutting down all
> executors
> 15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Asking each executor
> to shut down
> 15/10/03 03:13:59 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 15/10/03 03:13:59 INFO Utils: path =
> /data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596/blockmgr-d8e40805-3b8c-45f4-97b3-b89874158796,
> already present as root for deletion.
> 15/10/03 03:13:59 INFO MemoryStore: MemoryStore cleared
> 15/10/03 03:13:59 INFO BlockManager: BlockManager stopped
> 15/10/03 03:13:59 INFO BlockManagerMaster: BlockManagerMaster stopped
> 15/10/03 03:13:59 INFO
> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> OutputCommitCoordinator stopped!
> 15/10/03 03:13:59 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.lang.Exception: Fail!
> at com.test.ExampleJob$.main(ExampleJob.scala:14)
> at com.test.ExampleJob.main(ExampleJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote
> daemon shut down; proceeding with flushing remote transports.
> 15/10/03 03:13:59 INFO Utils: Shutdown hook called
> 15/10/03 03:13:59 INFO Utils: Deleting directory
> /data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596
> 15/10/03 03:14:00 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
> shut down.
>
>
> However, the Spark UI just shows the status as "FINISHED". Is this a
> configuration error on my side?
> [image: Inline image 1]
>
>
> Thanks,
>
> Isabelle
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Romi Kuntsman
except "spark.master", do you have "spark://" anywhere in your code or
config files?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. 
wrote:

>
> -- Forwarded message --
> From: "Balachandar R.A." 
> Date: 02-Nov-2015 12:53 pm
> Subject: Re: Error : - No filesystem for scheme: spark
> To: "Jean-Baptiste Onofré" 
> Cc:
>
> > HI JB,
> > Thanks for the response,
> > Here is the content of my spark-defaults.conf
> >
> >
> > # Default system properties included when running spark-submit.
> > # This is useful for setting default environmental settings.
> >
> > # Example:
> >  spark.master spark://fdoat:7077
> > # spark.eventLog.enabled   true
> >  spark.eventLog.dir/home/bala/spark-logs
> > # spark.eventLog.dir   hdfs://namenode:8021/directory
> > # spark.serializer
> org.apache.spark.serializer.KryoSerializer
> > # spark.driver.memory  5g
> > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
> -Dnumbers="one two three"
> >
> >
> > regards
> > Bala
>
> >
> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré 
> wrote:
> >>
> >> Hi,
> >>
> >> do you have something special in conf/spark-defaults.conf (especially
> on the eventLog directory) ?
> >>
> >> Regards
> >> JB
> >>
> >>
> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
> >>>
> >>> Can someone tell me at what point this error could come?
> >>>
> >>> In one of my use cases, I am trying to use hadoop custom input format.
> >>> Here is my code.
> >>>
> >>> |valhConf:Configuration=sc.hadoopConfiguration
> >>>
> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
> >>>
> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
> >>>
> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
> >>>
> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
> >>>
> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
> >>> below error in my spark-submit launch|
> >>>
> >>> |
> >>> |
> >>>
> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
> >>> 0.0(TID
> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
> >>> at
> >>>
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
> >>>
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
> >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
> >>>
> >>> Any help here to move towards fixing this will be of great help
> >>>
> >>>
> >>>
> >>> Thanks
> >>>
> >>> Bala
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
Yes. In two different places I use spark://

1. In my code, while creating spark configuration, I use the code below

val sConf = new
SparkConf().setAppName("Dummy").setMaster("spark://:7077")
val sConf = val sc = new SparkContext(sConf)


2. I run the job using the command below

spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
myjob.jar

regards
Bala


On 2 November 2015 at 14:59, Romi Kuntsman  wrote:

> except "spark.master", do you have "spark://" anywhere in your code or
> config files?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. <
> balachandar...@gmail.com> wrote:
>
>>
>> -- Forwarded message --
>> From: "Balachandar R.A." 
>> Date: 02-Nov-2015 12:53 pm
>> Subject: Re: Error : - No filesystem for scheme: spark
>> To: "Jean-Baptiste Onofré" 
>> Cc:
>>
>> > HI JB,
>> > Thanks for the response,
>> > Here is the content of my spark-defaults.conf
>> >
>> >
>> > # Default system properties included when running spark-submit.
>> > # This is useful for setting default environmental settings.
>> >
>> > # Example:
>> >  spark.master spark://fdoat:7077
>> > # spark.eventLog.enabled   true
>> >  spark.eventLog.dir/home/bala/spark-logs
>> > # spark.eventLog.dir   hdfs://namenode:8021/directory
>> > # spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>> > # spark.driver.memory  5g
>> > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
>> -Dnumbers="one two three"
>> >
>> >
>> > regards
>> > Bala
>>
>> >
>> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> do you have something special in conf/spark-defaults.conf (especially
>> on the eventLog directory) ?
>> >>
>> >> Regards
>> >> JB
>> >>
>> >>
>> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>> >>>
>> >>> Can someone tell me at what point this error could come?
>> >>>
>> >>> In one of my use cases, I am trying to use hadoop custom input format.
>> >>> Here is my code.
>> >>>
>> >>> |valhConf:Configuration=sc.hadoopConfiguration
>> >>>
>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>> >>>
>> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>> >>>
>> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>> >>>
>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>> >>>
>> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
>> >>> below error in my spark-submit launch|
>> >>>
>> >>> |
>> >>> |
>> >>>
>> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
>> >>> 0.0(TID
>> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
>> >>> at
>> >>>
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
>> >>>
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
>> >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
>> >>>
>> >>> Any help here to move towards fixing this will be of great help
>> >>>
>> >>>
>> >>>
>> >>> Thanks
>> >>>
>> >>> Bala
>> >>>
>> >>
>> >> --
>> >> Jean-Baptiste Onofré
>> >> jbono...@apache.org
>> >> http://blog.nanthrax.net
>> >> Talend - http://www.talend.com
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>>
>>
>


Required file not found: sbt-interface.jar

2015-11-02 Thread Todd
Hi,
I am trying to build spark 1.5.1 in my environment, but encounter the following 
error complaining Required file not found: sbt-interface.jar:
The error message is below and I am building with:

./make-distribution.sh --name spark-1.5.1-bin-2.6.0 --tgz --with-tachyon 
-Phadoop-2.6 -Dhadoop.version=2.6.0 -Pyarn -Phive -Dhive-version=1.2.1 
-Phive-thriftserver -Pspark-ganglia-lgpl -Dscala-2.10.4



[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ 
spark-launcher_2.10 ---
[INFO] Using zinc server for incremental compilation
[error] Required file not found: sbt-interface.jar
[error] See zinc -help for information about locating necessary files
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [  3.192 s]
[INFO] Spark Project Launcher . FAILURE [  2.950 s]



Re: Split RDD into multiple RDDs using filter-transformation

2015-11-02 Thread Deng Ching-Mallete
Hi,

You should perform an action (e.g. count, take, saveAs*, etc. ) in order
for your RDDs to be cached since cache/persist are lazy functions. You
might also want to do coalesce instead of repartition to avoid shuffling.

Thanks,
Deng

On Mon, Nov 2, 2015 at 5:53 PM, Sushrut Ikhar 
wrote:

> Hi,
> I need to split a RDD into 3 different RDD using filter-transformation.
> I have cached the original RDD before using filter.
> The input is lopsided leaving some executors with heavy load while others
> with less; so I have repartitioned it.
>
> *DAG-lineage I expected:*
>
> I/P RDD  -->  MAP RDD --> SHUFFLE RDD (repartition) -->
>
> *MAP RDD (cache)* --> FILTER RDD1 --> MAP1 --> UNION RDD --> O/P RDD
>--> FILTER RDD2 --> MAP2
>--> FILTER RDD3 --> MAP3
>
> *DAG-lineage I observed:*
>
> I/P RDD  -->  MAP RDD -->
>
> SHUFFLE RDD (repartition) --> *MAP RDD (cache)* --> FILTER RDD1 --> MAP1
> SHUFFLE RDD (repartition) --> *MAP RDD (cache)* --> FILTER RDD2 --> MAP2
> SHUFFLE RDD (repartition) --> *MAP RDD (cache)* --> FILTER RDD3 --> MAP3
> -->
>
> UNION RDD --> O/P RDD
>
> Also I Spark-UI shows that no RDD partitioned are actually being cached.
>
> How do I split then without shuffling thrice?
> Regards,
>
> Sushrut Ikhar
> [image: https://]about.me/sushrutikhar
> 
>
>


Re: How to lookup by a key in an RDD

2015-11-02 Thread swetha kasireddy
Hi,

Is Indexed RDDs released yet?

Thanks,
Swetha

On Sun, Nov 1, 2015 at 1:21 AM, Gylfi  wrote:

> Hi.
>
> You may want to look into Indexed RDDs
> https://github.com/amplab/spark-indexedrdd
>
> Regards,
> Gylfi.
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to lookup by a key in an RDD

2015-11-02 Thread Ted Yu
Please take a look at SPARK-2365

On Mon, Nov 2, 2015 at 3:25 PM, swetha kasireddy 
wrote:

> Hi,
>
> Is Indexed RDDs released yet?
>
> Thanks,
> Swetha
>
> On Sun, Nov 1, 2015 at 1:21 AM, Gylfi  wrote:
>
>> Hi.
>>
>> You may want to look into Indexed RDDs
>> https://github.com/amplab/spark-indexedrdd
>>
>> Regards,
>> Gylfi.
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-11-02 Thread Babar Tareen
I am using *'sbt run'* to execute the code. Detailed sbt output is here (
https://drive.google.com/open?id=0B2dlA_DzEohVakpValRjRS1zVG8).

I had scala 2.11.7 installed on my machine. But even after uninstalling it,
I am still getting the exception with 2.11.6.

Changing the scala version to 2.11.7 in build.sbt fixes the exception as
you suggested. I am unclear as to why it works with 2.11.7 and not 2.11.6.

Thanks,
Babar

On Mon, Nov 2, 2015 at 2:10 PM Jonathan Coveney  wrote:

> Caused by: java.lang.ClassNotFoundException: scala.Some
>
> indicates that you don't have the scala libs present. How are you
> executing this? My guess is the issue is a conflict between scala 2.11.6 in
> your build and 2.11.7? Not sure...try setting your scala to 2.11.7?
>
> But really, first it'd be good to see what command you're using to invoke
> this.
>
> 2015-11-02 14:48 GMT-05:00 Babar Tareen :
>
>> Resending, haven't found a workaround. Any help is highly appreciated.
>>
>> -- Forwarded message --
>> From: Babar Tareen 
>> Date: Thu, Oct 22, 2015 at 2:47 PM
>> Subject: Getting ClassNotFoundException: scala.Some on Spark 1.5.x
>> To: user@spark.apache.org
>>
>>
>> Hi,
>>
>> I am getting following exception when submitting a job to Spark 1.5.x
>> from Scala. The same code works with Spark 1.4.1. Any clues as to what
>> might causing the exception.
>>
>>
>>
>> *Code:App.scala*import org.apache.spark.SparkContext
>>
>> object App {
>>   def main(args: Array[String]) = {
>> val l = List(1,2,3,4,5,6,7,8,9,0)
>> val sc = new SparkContext("local[4]", "soark-test")
>> val rdd = sc.parallelize(l)
>> rdd.foreach(println)
>> println(rdd.collect())
>>   }
>> }
>>
>> *build.sbt*
>> lazy val sparkjob = (project in file("."))
>>   .settings(
>> name := "SparkJob",
>> version := "1.0",
>> scalaVersion := "2.11.6",
>> libraryDependencies := libs
>> )
>>
>> lazy val libs = Seq(
>>   "org.apache.spark" %% "spark-core" % "1.5.1"
>> )
>>
>>
>> *Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach
>> at app.scala:9, took 0.689832 s
>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
>> stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
>> Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
>> java.lang.ClassNotFoundException: scala.Some
>> [error] at
>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
>> [error] at
>> org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
>> [error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> [error] at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> [error] at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> [error] at java.lang.reflect.Method.invoke(Method.java:497)
>> [error] at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> [error] at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>> [error] at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> [error] at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> [error] at
>> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>> [error] at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> [error] at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> [error] at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> [error] at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>> [error] at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>> [error] at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> [error] at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> [error] at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> [error] at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>> [error] at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>> [error] at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>> [error] at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [error] at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [error] at java.lang.Thread.run(Thread.java:745)
>> [error] Caused by: java.lang.ClassNotFoundException: scala.Some
>> [error] at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> [error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> [error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

ipython notebook NameError: name 'sc' is not defined

2015-11-02 Thread Andy Davidson
Hi

I recently installed a new cluster using the
spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2. SparkPi sample app works correctly.

I am trying to run iPython notebook on my cluster master and use an ssh
tunnel so that I can work with the notebook in a browser running on my mac.
Bellow is how I set up the ssh tunnel

$ ssh -i $KEY_FILE -N -f -L localhost::localhost:7000
ec2-user@$SPARK_MASTER

$ ssh -i $KEY_FILE ec2-user@$SPARK_MASTER
$ cd top level notebook dir
$ IPYTHON_OPTS="notebook --no-browser --port=7000" /root/spark/bin/pyspark

I am able to access my notebooks in the browser by opening
http://localhost:

When I run the following python code I get an error NameError: name 'sc' is
not defined? Any idea what the problem might be?

I looked through pyspark and tried various combinations of the following but
still get the same error

$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
--no-browser --port=7000" /root/spark/bin/pyspark --master=local[2]

Kind regards

Andy





In [1]:
import sys
print (sys.version)
 
import os
print(os.getcwd() + "\n")
2.6.9 (unknown, Apr  1 2015, 18:16:00)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]
/home/ec2-user/dataScience

In [2]:
from pyspark import SparkContext
textFile = sc.textFile("readme.txt")
textFile.take(1)
---
NameError Traceback (most recent call last)
 in ()
  1 from pyspark import SparkContext
> 2 textFile = sc.textFile("readme.txt")
  3 textFile.take(1)

NameError: name 'sc' is not defined

In [ ]:
 




Re: Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-11-02 Thread Jonathan Coveney
My guess, and it's just a guess, is that there is some change between
versions which you got bit by as it chsnged the class path.

El lunes, 2 de noviembre de 2015, Babar Tareen 
escribió:

> I am using *'sbt run'* to execute the code. Detailed sbt output is here (
> https://drive.google.com/open?id=0B2dlA_DzEohVakpValRjRS1zVG8).
>
> I had scala 2.11.7 installed on my machine. But even after uninstalling
> it, I am still getting the exception with 2.11.6.
>
> Changing the scala version to 2.11.7 in build.sbt fixes the exception as
> you suggested. I am unclear as to why it works with 2.11.7 and not 2.11.6.
>
> Thanks,
> Babar
>
> On Mon, Nov 2, 2015 at 2:10 PM Jonathan Coveney  > wrote:
>
>> Caused by: java.lang.ClassNotFoundException: scala.Some
>>
>> indicates that you don't have the scala libs present. How are you
>> executing this? My guess is the issue is a conflict between scala 2.11.6 in
>> your build and 2.11.7? Not sure...try setting your scala to 2.11.7?
>>
>> But really, first it'd be good to see what command you're using to invoke
>> this.
>>
>> 2015-11-02 14:48 GMT-05:00 Babar Tareen > >:
>>
>>> Resending, haven't found a workaround. Any help is highly appreciated.
>>>
>>> -- Forwarded message --
>>> From: Babar Tareen >> >
>>> Date: Thu, Oct 22, 2015 at 2:47 PM
>>> Subject: Getting ClassNotFoundException: scala.Some on Spark 1.5.x
>>> To: user@spark.apache.org
>>> 
>>>
>>>
>>> Hi,
>>>
>>> I am getting following exception when submitting a job to Spark 1.5.x
>>> from Scala. The same code works with Spark 1.4.1. Any clues as to what
>>> might causing the exception.
>>>
>>>
>>>
>>> *Code:App.scala*import org.apache.spark.SparkContext
>>>
>>> object App {
>>>   def main(args: Array[String]) = {
>>> val l = List(1,2,3,4,5,6,7,8,9,0)
>>> val sc = new SparkContext("local[4]", "soark-test")
>>> val rdd = sc.parallelize(l)
>>> rdd.foreach(println)
>>> println(rdd.collect())
>>>   }
>>> }
>>>
>>> *build.sbt*
>>> lazy val sparkjob = (project in file("."))
>>>   .settings(
>>> name := "SparkJob",
>>> version := "1.0",
>>> scalaVersion := "2.11.6",
>>> libraryDependencies := libs
>>> )
>>>
>>> lazy val libs = Seq(
>>>   "org.apache.spark" %% "spark-core" % "1.5.1"
>>> )
>>>
>>>
>>> *Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach
>>> at app.scala:9, took 0.689832 s
>>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
>>> stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
>>> Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
>>> java.lang.ClassNotFoundException: scala.Some
>>> [error] at
>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
>>> [error] at
>>> org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
>>> [error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> [error] at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> [error] at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> [error] at java.lang.reflect.Method.invoke(Method.java:497)
>>> [error] at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>> [error] at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>>> [error] at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> [error] at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> [error] at
>>> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>>> [error] at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> [error] at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> [error] at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> [error] at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>> [error] at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>> [error] at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> [error] at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> [error] at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>> [error] at
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>>> [error] at
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>>> [error] at
>>> 

Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Jean-Baptiste Onofré

Ah ok. Good catch ;)

Regards
JB

On 11/02/2015 11:51 AM, Balachandar R.A. wrote:

I made a stupid mistake it seems. I supplied the --master option to the
spark url in my launch command. And this error is gone.

Thanks for pointing out possible places for troubleshooting

Regards
Bala

On 02-Nov-2015 3:15 pm, "Balachandar R.A." > wrote:

No.. I am not using yarn. Yarn is not running in my cluster. So, it
is standalone one.

Regards
Bala

On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré" > wrote:

Just to be sure: you use yarn cluster (not standalone), right ?

Regards
JB

On 11/02/2015 10:37 AM, Balachandar R.A. wrote:

Yes. In two different places I use spark://

1. In my code, while creating spark configuration, I use the
code below

val sConf = new
SparkConf().setAppName("Dummy").setMaster("spark://:7077")
val sConf = val sc = new SparkContext(sConf)


2. I run the job using the command below

spark-submit  --class org.myjob  --jars myjob.jar
spark://:7077
myjob.jar

regards
Bala


On 2 November 2015 at 14:59, Romi Kuntsman 
>> wrote:

 except "spark.master", do you have "spark://" anywhere
in your code
 or config files?

 *Romi Kuntsman*, /Big Data Engineer/_
 _
http://www.totango.com 

 On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A.
 
>> wrote:


 -- Forwarded message --
 From: "Balachandar R.A." 
 >>
 Date: 02-Nov-2015 12:53 pm
 Subject: Re: Error : - No filesystem for scheme: spark
 To: "Jean-Baptiste Onofré" 
 >>
 Cc:

  > HI JB,
  > Thanks for the response,
  > Here is the content of my spark-defaults.conf
  >
  >
  > # Default system properties included when
running spark-submit.
  > # This is useful for setting default
environmental settings.
  >
  > # Example:
  >  spark.master spark://fdoat:7077
  > # spark.eventLog.enabled   true
  >  spark.eventLog.dir
/home/bala/spark-logs
  > # spark.eventLog.dir
  hdfs://namenode:8021/directory
  > # spark.serializer
 org.apache.spark.serializer.KryoSerializer
  > # spark.driver.memory  5g
  > # spark.executor.extraJavaOptions
-XX:+PrintGCDetails
 -Dkey=value -Dnumbers="one two three"
  >
  >
  > regards
  > Bala


  >
  > On 2 November 2015 at 12:21, Jean-Baptiste Onofré
 
>> wrote:
  >>
  >> Hi,
  >>
  >> do you have something special in
conf/spark-defaults.conf
 (especially on the eventLog directory) ?
  >>
  >> Regards
  >> JB
  >>
  >>
  >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
  >>>
  >>> Can someone tell me at what point this error
could come?
  >>>
  >>> In one of my use cases, I am trying to use
hadoop custom
 input format.
  >>> Here is my code.
  >>>
  >>> |valhConf:Configuration=sc.hadoopConfiguration
  >>>



Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Balachandar R.A.
I made a stupid mistake it seems. I supplied the --master option to the
spark url in my launch command. And this error is gone.

Thanks for pointing out possible places for troubleshooting

Regards
Bala
On 02-Nov-2015 3:15 pm, "Balachandar R.A."  wrote:

> No.. I am not using yarn. Yarn is not running in my cluster. So, it is
> standalone one.
>
> Regards
> Bala
> On 02-Nov-2015 3:11 pm, "Jean-Baptiste Onofré"  wrote:
>
>> Just to be sure: you use yarn cluster (not standalone), right ?
>>
>> Regards
>> JB
>>
>> On 11/02/2015 10:37 AM, Balachandar R.A. wrote:
>>
>>> Yes. In two different places I use spark://
>>>
>>> 1. In my code, while creating spark configuration, I use the code below
>>>
>>> val sConf = new
>>> SparkConf().setAppName("Dummy").setMaster("spark://:7077")
>>> val sConf = val sc = new SparkContext(sConf)
>>>
>>>
>>> 2. I run the job using the command below
>>>
>>> spark-submit  --class org.myjob  --jars myjob.jar spark://:7077
>>> myjob.jar
>>>
>>> regards
>>> Bala
>>>
>>>
>>> On 2 November 2015 at 14:59, Romi Kuntsman >> > wrote:
>>>
>>> except "spark.master", do you have "spark://" anywhere in your code
>>> or config files?
>>>
>>> *Romi Kuntsman*, /Big Data Engineer/_
>>> _
>>> http://www.totango.com 
>>>
>>> On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A.
>>> > wrote:
>>>
>>>
>>> -- Forwarded message --
>>> From: "Balachandar R.A." >> >
>>> Date: 02-Nov-2015 12:53 pm
>>> Subject: Re: Error : - No filesystem for scheme: spark
>>> To: "Jean-Baptiste Onofré" >> >
>>> Cc:
>>>
>>>  > HI JB,
>>>  > Thanks for the response,
>>>  > Here is the content of my spark-defaults.conf
>>>  >
>>>  >
>>>  > # Default system properties included when running
>>> spark-submit.
>>>  > # This is useful for setting default environmental settings.
>>>  >
>>>  > # Example:
>>>  >  spark.master spark://fdoat:7077
>>>  > # spark.eventLog.enabled   true
>>>  >  spark.eventLog.dir/home/bala/spark-logs
>>>  > # spark.eventLog.dir
>>>  hdfs://namenode:8021/directory
>>>  > # spark.serializer
>>> org.apache.spark.serializer.KryoSerializer
>>>  > # spark.driver.memory  5g
>>>  > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails
>>> -Dkey=value -Dnumbers="one two three"
>>>  >
>>>  >
>>>  > regards
>>>  > Bala
>>>
>>>
>>>  >
>>>  > On 2 November 2015 at 12:21, Jean-Baptiste Onofré
>>> > wrote:
>>>  >>
>>>  >> Hi,
>>>  >>
>>>  >> do you have something special in conf/spark-defaults.conf
>>> (especially on the eventLog directory) ?
>>>  >>
>>>  >> Regards
>>>  >> JB
>>>  >>
>>>  >>
>>>  >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
>>>  >>>
>>>  >>> Can someone tell me at what point this error could come?
>>>  >>>
>>>  >>> In one of my use cases, I am trying to use hadoop custom
>>> input format.
>>>  >>> Here is my code.
>>>  >>>
>>>  >>> |valhConf:Configuration=sc.hadoopConfiguration
>>>  >>>
>>>
>>> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
>>>  >>>
>>>
>>> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
>>>  >>>
>>>
>>> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
>>>  >>>
>>>
>>> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
>>>  >>>
>>>  >>> |The moment I invoke mapPartitionsWithInputSplit() method,
>>> I get the
>>>  >>> below error in my spark-submit launch|
>>>  >>>
>>>  >>> |
>>>  >>> |
>>>  >>>
>>>  >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask
>>> 0.0in stage
>>>  >>> 0.0(TID
>>> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
>>>  >>> at
>>>  >>>
>>>
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
>>>  >>>
>>>
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
>>>  >>>
>>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
>>> 

ClassNotFoundException even if class is present in Jarfile

2015-11-02 Thread hveiga
Hello,

I am facing an issue where I cannot run my Spark job in a cluster
environment (standalone or EMR) but it works successfully if I run it
locally using local[*] as master.

I am getting ClassNotFoundException: com.mycompany.folder.MyObject on the
slave executors. I don't really understand why this is happening since I
have uncompressed the Jarfile to make sure that the class is present inside
(both .java and .class) and all the rest of the classes are being loaded
fine.

Also, I would like to mention something weird that might be related but not
sure. There are two packages inside my jarfile that are called the same but
with different casing:

- com.mycompany.folder.MyObject
- com.myCompany.something.Else

Could that be the reason?

Also, I have tried adding my jarfiles in all the ways I could find
(sparkConf.setJars(...), sparkContext.addJar(...), spark-submit opt --jars,
...) but none of the actually worked.

I am using Apache Spark 1.5.0, Java 7, sbt 0.13.7, scala 2.10.5.

Thanks a lot,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-even-if-class-is-present-in-Jarfile-tp25254.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Sort Merge Join

2015-11-02 Thread Cheng, Hao
No as far as I can tell, @Michael @YinHuai @Reynold , any comments on this 
optimization?

From: Jonathan Coveney [mailto:jcove...@gmail.com]
Sent: Tuesday, November 3, 2015 4:17 AM
To: Alex Nastetsky
Cc: Cheng, Hao; user
Subject: Re: Sort Merge Join

Additionally, I'm curious if there are any JIRAS around making dataframes 
support ordering better? there are a lot of operations that can be optimized if 
you know that you have a total ordering on your data...are there any plans, or 
at least JIRAS, around having the catalyst optimizer handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky 
>:
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Yes, this is supported in

  *   Hive with bucket join
  *   Pig with USING 
"merge"
  *   MR with CompositeInputFormat
But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao 
> wrote:
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) 
requires the records with the identical join keys MUST BE shuffled to the same 
“reducer” node / task, hashpartitioning is just a strategy to tell spark 
shuffle service how to achieve that, in theory, we even can use the 
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t 
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much 
about the shuffle strategy so much if it satisfies the demand on data 
distribution.

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 
system to have UnknownPartitioning. In the example below, I try to join a 
dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key 
JOIN src c ON b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
 TungstenSort [key#19 ASC], false, 0
  TungstenExchange hashpartitioning(key#19,200)
   ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src, 
Some(a))
 TungstenSort [key#21 ASC], false, 0
  TungstenExchange hashpartitioning(key#21,200)
   ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src, 
Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
 HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b 
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can 
join another table “JOIN src c ON b.key=c.key” directly, which only require the 
table “c” for repartitioning on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Hao

From: Alex Nastetsky 
[mailto:alex.nastet...@vervemobile.com]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

CODE:
   val 

Spark executor jvm classloader not able to load nested jars

2015-11-02 Thread Nirav Patel
Hi,

I have maven based mixed scala/java application that can submit spar jobs.
My application jar "myapp.jar" has some nested jars inside lib folder. It's
a fat jar created using spring-boot-maven plugin which nest other jars
inside lib folder of parent jar. I prefer not to create shaded flat jar as
that would create other issues.

one of the nested jar is "common.jar". I have defined class-path attribute
in Manifest file like Class-Path: lib/common.jar. Spark executor throws
java.lang.NoClassDefFoundError:com/myapp/common/myclass error when
submitting application in yarn-client mode.
Class(com/myapp/common/myclass.class) and jar(common.jar) is there and
nested inside my main myapp.jar.

Does spark executor jvm can load nested jars here?

spark (jvm classloader) can find all the classes those are flat inside
myapp.jar itself. i.e. com/myapp/abc.class, com/myapp/xyz.class etc.

spark executor classloader can also find some classes from nested jar but
it throws NoClassDefFoundError some other classes in same nested jar!
here's the error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 0.0 (TID 3, host4.local):
java.lang.NoClassDefFoundError: com/myapp/common/myclass
at com.myapp.UserProfileRDD$.parse(UserProfileRDDInit.scala:111)
at 
com.myapp.UserProfileRDDInit$$anonfun$generateUserProfileRDD$1.apply(UserProfileRDDInit.scala:87)
at 
com.myapp.UserProfileRDDInit$$anonfun$generateUserProfileRDD$1.applyUserProfileRDDInit.scala:87)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)Caused by:
java.lang.ClassNotFoundException:
com.myapp.common.myclass
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 14 more

I do submit myapp.jar with sparkConf.setJar(String[] {"myapp.jar"}) and
also tried setting it onspark.yarn.executor.extraClassPath

As a workaround, I extracted myapp.jar and set sparkConf.setJar(String[]
{"myapp.jar","lib/common.jar"}) manually and error went away but obviously
I have to do that for all the nested jar which is not desirable.


Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Shixiong Zhu
"trackStateByKey" is about to be added in 1.6 to resolve the performance
issue of "updateStateByKey". You can take a look at
https://issues.apache.org/jira/browse/SPARK-2629 and
https://github.com/apache/spark/pull/9256


Re: Exception while reading from kafka stream

2015-11-02 Thread Ramkumar V
if i try like below code snippet , it shows exception , how to avoid this
exception ? how to switch processing based on topic ?

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(30));
HashSet topicsSet_1 = new
HashSet(Arrays.asList(topics.split(",")));
HashSet topicsSet_2 = new
HashSet(Arrays.asList(topics.split(",")));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream messages_1 =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
   topicsSet_1
   );

JavaPairInputDStream messages_2 =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
topicsSet_2
   );

* Log Trace* :

[ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw exception:
java.io.IOException: Failed to delete
somedomain/user/hdfs/spark_output/kyt_req/part-00055
15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: java.io.IOException:
Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
remote=somedomain]. 59994 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
remote=somedomain]. 59998 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain;
15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.NullPointerException)
15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
ApplicationMaster with FAILED (diag message: User class threw exception:
java.lang.NullPointerException)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
remote=somedomain]. 59991 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
[ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)



*Thanks*,



On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger  wrote:

> Just put them all in one stream and switch processing based on the topic
>
> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V 
> wrote:
>
>> i want to join all those logs in some manner. That's what i'm trying to
>> do.
>>
>> *Thanks*,
>> 
>>
>>
>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao 
>> wrote:
>>
>>> I don't think Spark Streaming supports multiple streaming context in one
>>> jvm, you cannot use in such way. Instead you could run multiple streaming
>>> applications, since you're using Yarn.
>>>
>>> 2015年10月30日星期五,Ramkumar V  写道:
>>>
 I found NPE is mainly because of im using the same JavaStreamingContext
 for some other kafka stream. if i change the name , its running
 successfully. how to run multiple JavaStreamingContext in a program ?  I'm
 getting following exception if i run multiple JavaStreamingContext in
 single file.

 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception:
 

Re: Best practises

2015-11-02 Thread Stefano Baghino
There is this interesting book from Databricks:
https://www.gitbook.com/book/databricks/databricks-spark-knowledge-base/details

What do you think? Does it contain the info you're looking for? :)

On Mon, Nov 2, 2015 at 2:18 PM, satish chandra j 
wrote:

> HI All,
> Yes, any such doc will be a great help!!!
>
>
>
> On Fri, Oct 30, 2015 at 4:35 PM, huangzheng <1106944...@qq.com> wrote:
>
>> I have the same question.anyone help us.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Deepak Sharma";
>> *发送时间:* 2015年10月30日(星期五) 晚上7:23
>> *收件人:* "user";
>> *主题:* Best practises
>>
>> Hi
>> I am looking for any blog / doc on the developer's best practices if
>> using Spark .I have already looked at the tuning guide on
>> spark.apache.org.
>> Please do let me know if any one is aware of any such resource.
>>
>> Thanks
>> Deepak
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Spark, Mesos problems with remote connections

2015-11-02 Thread Sebastian Kuepers
Hey,


I have a Mesos cluster with a single Master. If I run the following directly on 
the master machine:


pyspark --master mesos://host:5050


everything works just fine. If I try to connect from to the master starting a 
driver from my laptop everything stops after the following log output from the 
Spark driver:


I1102 15:10:57.848831 64856064 sched.cpp:164] Version: 0.24.0
I1102 15:10:57.852708 19017728 sched.cpp:262] New master detected at 
master@217.66.55.19:5050
I1102 15:10:57.852934 19017728 sched.cpp:272] No credentials provided. 
Attempting to register without authentication


The Mesos master logs show the following:


I1102 15:09:50.676004 21563 master.cpp:2250] Subscribing framework Talos with 
checkpointing disabled and capabilities [  ]
I1102 15:09:50.676686 21566 hierarchical.hpp:515] Added framework 
b995b024-6ec6-4bd9-b251-e4e6b1828eda-0004
I1102 15:09:51.671437 21562 http.cpp:336] HTTP GET for /master/state.json from 
217.66.51.150:51588 with User-Agent='Mozilla/5.0 (X11; Linux x86_64) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.80 Safari/537.36'
I1102 15:09:52.357230 21567 master.cpp:2179] Received SUBSCRIBE call for 
framework 'Talos' at 
scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
I1102 15:09:52.357416 21567 master.cpp:2250] Subscribing framework Talos with 
checkpointing disabled and capabilities [  ]
I1102 15:09:52.357520 21567 master.cpp:2260] Framework 
b995b024-6ec6-4bd9-b251-e4e6b1828eda-0004 (Talos) at 
scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085 already 
subscribed, resending acknowledgement
I1102 15:09:53.938227 21563 master.cpp:2179] Received SUBSCRIBE call for 
framework 'Talos' at 
scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085
I1102 15:09:53.938426 21563 master.cpp:2250] Subscribing framework Talos with 
checkpointing disabled and capabilities [  ]
I1102 15:09:53.938534 21563 master.cpp:2260] Framework 
b995b024-6ec6-4bd9-b251-e4e6b1828eda-0004 (Talos) at 
scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085 already 
subscribed, resending acknowledgement
I1102 15:10:00.207372 21567 master.cpp:2179] Received SUBSCRIBE call for 
framework 'Talos' at 
scheduler-fee376b3-ea04-4866-8412-957721da5a0b@10.200.120.148:51085


The framework also shows up as active in the UI and blocks all resources of the 
cluster, while the SUBSCRIBE calls keep coming in.

Mesos authentication is completely disabled.


What could be possible causes for this problem?


Thanks,

Sebastian





Disclaimer The information in this email and any attachments may contain 
proprietary and confidential information that is intended for the addressee(s) 
only. If you are not the intended recipient, you are hereby notified that any 
disclosure, copying, distribution, retention or use of the contents of this 
information is prohibited. When addressed to our clients or vendors, any 
information contained in this e-mail or any attachments is subject to the terms 
and conditions in any governing contract. If you have received this e-mail in 
error, please immediately contact the sender and delete the e-mail.


Does the Standalone cluster and Applications need to be same Spark version?

2015-11-02 Thread pnpritchard
The title gives the gist of it: Does the Standalone cluster and Applications
need to be same Spark version?

For example, say I have a Standalone cluster running version 1.5.0. Can I
run an application that was built with the spark library 1.5.1, and using
the spark-submit script from 1.5.1 release.

Thanks,
Nick



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-the-Standalone-cluster-and-Applications-need-to-be-same-Spark-version-tp25255.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org