Re: Spark Ingestion into Relational DB

2015-09-21 Thread Jörn Franke
In these cases you may want to have a separate oracle instance for the
batch process and another one for serving it to avoid  sla surprises.
Nevertheless, if data processing becomes more strategic cross-projects you
may think about job management and HDFS using Hadoop with Spark.

Le mar. 22 sept. 2015 à 8:02, Sri Eswari Devi Subbiah <
sriesh.subb...@gmail.com> a écrit :

> Hi,
>
> Thanks for the reply. Let me explain our scenario little bit more.
> Currently we have multiple data feeds through files from different systems.
> We run batch jobs to extract the data from files, normalize that data,
> match that data against Oracle database and finally consolidate the cleaned
> data in Oracle.
>
> I am evaluating rather than running batch jobs, can I run spark streaming
> from the data files to finally write the cleansed data into Oracle
> database. Once the data is consolidated in Oracle, it serves as the source
> of truth for external users.
>
> Regards,
> Sri Eswari.
>
> On Mon, Sep 21, 2015 at 10:55 PM, Jörn Franke 
> wrote:
>
>> You do not need Hadoop. However, you should think about using it. If you
>> use Spark to load data directly from Oracle then your database might have
>> unexpected loads of data once a Spark node may fail. Additionally, the
>> Oracle Database, if it is not based on local disk, may have a storage
>> bottleneck. Furthermore, Spark standalone has no resource management
>> mechanism for supporting different slas, you may need yarn (hadoop) for
>> that. Finally, using the Oracle Database for storing all the data may be an
>> expensive exercise. What I have seen often is that hadoop is used for
>> storing all the data and managing the resources. Spark can be used for
>> machine learning over this data and the Oracle Database (or any relational
>> datastore, Nosql database, in-memory db) is used to serve the data to a lot
>> of users. This is also the basic idea behind the lambda architecture.
>>
>> Le mar. 22 sept. 2015 à 7:13, Sri  a écrit :
>>
>>> Hi,
>>>
>>> We have a usecase  where we get the dated from different systems and
>>> finally
>>> data will be consolidated into Oracle Database. Does spark is a valid
>>> useless for this scenario. Currently we also don't have any big data
>>> component. In case if we go with Spark to ingest data, does it require
>>> hadoop.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: Mesos Tasks only run on one node

2015-09-21 Thread Tim Chen
What configuration have you used, and what are the slaves configuration?

Possiblity all other nodes either don't have enough resources, are is using
a another role that's preventing from the executor to be launched.

Tim

On Mon, Sep 21, 2015 at 1:58 PM, John Omernik  wrote:

> I have a happy healthy Mesos cluster (0.24) running in my lab.  I've
> compiled spark-1.5.0 and it seems to be working fine, except for one small
> issue, my tasks all seem to run on one node. (I have 6 in the cluster).
>
> Basically, I have directory of compressed text files.  Compressed, these
> 25 files add up to 1.2 GB of data, in bin/pyspark I do:
>
> txtfiles = sc.textFile("/path/to/my/data/*")
> txtfiles.count()
>
> This goes through and gives me the correct count, but all my tasks (25 of
> them) run on one node, let's call it node4.
>
> Interesting.
>
> So I was running spark from node4, but I would have thought it would have
> hit up more nodes.
>
> So I ran it on node5.  In executors tab on the spark UI, there is only one
> registered, and it's node4, and once again all tasks ran on node4.
>
> I am running in fine grain mode... is there a setting somewhere to allow
> for more executors? This seems weird. I've been away from Spark from 1.2.x
> but I don't seem to remember this...
>
>
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-21 Thread Tim Chen
Got it, you're right we shouldn't crash when something went wrong when
creating the job.

This should be fixed soon.

Thanks!

Tim

On Mon, Sep 21, 2015 at 11:24 AM, Alan Braithwaite 
wrote:

> That could be the behavior but spark.mesos.executor.home being unset still
> raises an exception inside the dispatcher preventing a docker from even
> being started.  I can see if other properties are inherited from the
> default environment when that's set, if you'd like.
>
> I think the main problem is just that premature validation is being done
> on the dispatcher and the dispatcher crashing in the event of bad config.
>
> - Alan
>
> On Sat, Sep 19, 2015 at 11:03 AM, Timothy Chen  wrote:
>
>> You can still provide properties through the docker container by putting
>> configuration in the conf directory, but we try to pass all properties
>> submitted from the driver spark-submit through which I believe will
>> override the defaults.
>>
>> This is not what you are seeing?
>>
>> Tim
>>
>>
>> On Sep 19, 2015, at 9:01 AM, Alan Braithwaite 
>> wrote:
>>
>> The assumption that the executor has no default properties set in it's
>> environment through the docker container.  Correct me if I'm wrong, but any
>> properties which are unset in the SparkContext will come from the
>> environment of the executor will it not?
>>
>> Thanks,
>> - Alan
>>
>> On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen  wrote:
>>
>>> I guess I need a bit more clarification, what kind of assumptions was
>>> the dispatcher making?
>>>
>>> Tim
>>>
>>>
>>> On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite 
>>> wrote:
>>>
 Hi Tim,

 Thanks for the follow up.  It's not so much that I expect the executor
 to inherit the configuration of the dispatcher as I* don't *expect the
 dispatcher to make assumptions about the system environment of the executor
 (since it lives in a docker).  I could potentially see a case where you
 might want to explicitly forbid the defaults, but I can't think of any
 right now.

 Otherwise, I'm confused as to why the defaults in the docker image for
 the executor are just ignored.  I suppose that it's the dispatchers job to
 ensure the *exact* configuration of the executor, regardless of the
 defaults set on the executors machine?  Is that the assumption being made?
 I can understand that in contexts which aren't docker driven since jobs
 could be rolling out in the middle of a config update.  Trying to think of
 this outside the terms of just mesos/docker (since I'm fully aware that
 docker doesn't rule the world yet).

 So I can see this from both perspectives now and passing in the
 properties file will probably work just fine for me, but for my better
 understanding: When the executor starts, will it read any of the
 environment that it's executing in or will it just take only the properties
 given to it by the dispatcher and nothing more?

 Lemme know if anything needs more clarification and thanks for your
 mesos contribution to spark!

 - Alan

 On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen 
 wrote:

> Hi Alan,
>
> If I understand correctly, you are setting executor home when you
> launch the dispatcher and not on the configuration when you submit job, 
> and
> expect it to inherit that configuration?
>
> When I worked on the dispatcher I was assuming all configuration is
> passed to the dispatcher to launch the job exactly how you will need to
> launch it with client mode.
>
> But indeed it shouldn't crash dispatcher, I'll take a closer look when
> I get a chance.
>
> Can you recommend changes on the documentation, either in email or a
> PR?
>
> Thanks!
>
> Tim
>
> Sent from my iPhone
>
> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite 
> wrote:
>
> Hey All,
>
> To bump this thread once again, I'm having some trouble using the
> dispatcher as well.
>
> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed
> the dispatcher as Marathon job.  When I submit a job using spark submit,
> the dispatcher writes back that the submission was successful and then
> promptly dies in marathon.  Looking at the logs reveals it was hitting the
> following line:
>
> 398:  throw new SparkException("Executor Spark home
> `spark.mesos.executor.home` is not set!")
>
> Which is odd because it's set in multiple places (SPARK_HOME,
> spark.mesos.executor.home, spark.home, etc).  Reading the code, it
> appears that the driver desc pulls only from the request and disregards 
> any
> other properties that may be configured.  Testing by passing --conf
> spark.mesos.executor.home=/usr/local/spark on the command line to
> spark-submit confirms this.  We're trying to isolate the number of places
> where we have to set p

Re: Spark Ingestion into Relational DB

2015-09-21 Thread Sri Eswari Devi Subbiah
Hi,

Thanks for the reply. Let me explain our scenario little bit more.
Currently we have multiple data feeds through files from different systems.
We run batch jobs to extract the data from files, normalize that data,
match that data against Oracle database and finally consolidate the cleaned
data in Oracle.

I am evaluating rather than running batch jobs, can I run spark streaming
from the data files to finally write the cleansed data into Oracle
database. Once the data is consolidated in Oracle, it serves as the source
of truth for external users.

Regards,
Sri Eswari.

On Mon, Sep 21, 2015 at 10:55 PM, Jörn Franke  wrote:

> You do not need Hadoop. However, you should think about using it. If you
> use Spark to load data directly from Oracle then your database might have
> unexpected loads of data once a Spark node may fail. Additionally, the
> Oracle Database, if it is not based on local disk, may have a storage
> bottleneck. Furthermore, Spark standalone has no resource management
> mechanism for supporting different slas, you may need yarn (hadoop) for
> that. Finally, using the Oracle Database for storing all the data may be an
> expensive exercise. What I have seen often is that hadoop is used for
> storing all the data and managing the resources. Spark can be used for
> machine learning over this data and the Oracle Database (or any relational
> datastore, Nosql database, in-memory db) is used to serve the data to a lot
> of users. This is also the basic idea behind the lambda architecture.
>
> Le mar. 22 sept. 2015 à 7:13, Sri  a écrit :
>
>> Hi,
>>
>> We have a usecase  where we get the dated from different systems and
>> finally
>> data will be consolidated into Oracle Database. Does spark is a valid
>> useless for this scenario. Currently we also don't have any big data
>> component. In case if we go with Spark to ingest data, does it require
>> hadoop.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Spark Ingestion into Relational DB

2015-09-21 Thread Jörn Franke
You do not need Hadoop. However, you should think about using it. If you
use Spark to load data directly from Oracle then your database might have
unexpected loads of data once a Spark node may fail. Additionally, the
Oracle Database, if it is not based on local disk, may have a storage
bottleneck. Furthermore, Spark standalone has no resource management
mechanism for supporting different slas, you may need yarn (hadoop) for
that. Finally, using the Oracle Database for storing all the data may be an
expensive exercise. What I have seen often is that hadoop is used for
storing all the data and managing the resources. Spark can be used for
machine learning over this data and the Oracle Database (or any relational
datastore, Nosql database, in-memory db) is used to serve the data to a lot
of users. This is also the basic idea behind the lambda architecture.

Le mar. 22 sept. 2015 à 7:13, Sri  a écrit :

> Hi,
>
> We have a usecase  where we get the dated from different systems and
> finally
> data will be consolidated into Oracle Database. Does spark is a valid
> useless for this scenario. Currently we also don't have any big data
> component. In case if we go with Spark to ingest data, does it require
> hadoop.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Ingestion into Relational DB

2015-09-21 Thread ayan guha
No, it does not require hadoop.


   1. However, I doubt if this is a good usecase for spark. You probably
   would be better off and gain better performance with sqlloader.


On Tue, Sep 22, 2015 at 3:13 PM, Sri  wrote:

> Hi,
>
> We have a usecase  where we get the dated from different systems and
> finally
> data will be consolidated into Oracle Database. Does spark is a valid
> useless for this scenario. Currently we also don't have any big data
> component. In case if we go with Spark to ingest data, does it require
> hadoop.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Spark Ingestion into Relational DB

2015-09-21 Thread Sri
Hi,

We have a usecase  where we get the dated from different systems and finally
data will be consolidated into Oracle Database. Does spark is a valid
useless for this scenario. Currently we also don't have any big data
component. In case if we go with Spark to ingest data, does it require
hadoop.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Invalid checkpoint url

2015-09-21 Thread srungarapu vamsi
I am using reduceByKeyAndWindow (with inverse reduce function) in my code.
In order to use this, it seems the checkpointDirectory which i have to use
should be hadoop compatible file system.
Does that mean that, i should setup hadoop on my system.
I googled about this and i found in a S.O answer that i need not setup hdfs
but the checkpoint directory should be HDFS copatible.

I am a beginner in this area. I am running my spark streaming application
on ubuntu 14.04, spark -1.3.1
If at all i need not setup hdfs and ext4 is hdfs compatible, then how does
my checkpoint directory look like?

i tried all these:
ssc.checkpoint("/tmp/checkpoint")
ssc.checkpoint("hdfs:///tmp/checkpoint")
ssc.checkpoint("file:///tmp/checkpoint")

But none of them worked for me.

-- 
/Vamsi


Re: word count (group by users) in spark

2015-09-21 Thread Huy Banh
Hi Sri,

The yield() is from scala. In a for comprehension, it creates a returned
collection, and yield adds elements to that return collection.

To output strings instead of tuple data, you could use aggregateByKey,
instead of groupByKey:

val output = wordCounts.
  map({case ((user, word), count) => (user, (word, count))}).
  aggregateByKey("")(
seqOp = (s: String, t: (String, Int)) => s + t._1 + " [" + t._2 + "] ",
combOp = _ + _ ).
  map({case (user, result) => user + " " + result})

Hope it helps,
Huy.

On Tue, Sep 22, 2015 at 2:35 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi Huy,
>
> That worked like a charm, can we remove CompactBuffer from the output.
>
> (u2,CompactBuffer((three,2), (four,1)))
> (u1,CompactBuffer((one,2), (two,1)))
>
> what yield does in spark ? can you please explain...
>
> Thanks
> Sri
>
>
>
> On Mon, Sep 21, 2015 at 6:13 AM, Huy Banh  wrote:
>
>> Hi,
>>
>> If your input format is user -> comment, then you could:
>>
>> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
>> four three")))
>> val wordCounts = comments.
>>flatMap({case (user, comment) =>
>> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>>reduceByKey(_ + _)
>>
>> val output = wordCounts.
>>map({case ((user, word), count) => (user, (word, count))}).
>>groupByKey()
>>
>> By Aniket, if we group by user first, it could run out of memory when
>> spark tries to put all words in a single sequence, couldn't it?
>>
>> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Using scala API, you can first group by user and then use combineByKey.
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Hi All,
 I would like to achieve this below output using spark , I managed to
 write
 in Hive and call it in spark but not in just spark (scala), how to group
 word counts on particular user (column) for example.
 Imagine users and their given tweets I want to do word count based on
 user
 name.

 Input:-
 kaliA,B,A,B,B
 james B,A,A,A,B

 Output:-
 kali A [Count] B [Count]
 James A [Count] B [Count]

 My Hive Answer:-
 CREATE EXTERNAL TABLE  TEST
 (
  user_name string   ,
  COMMENTS  STRING

 )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
 LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
 create a text file with data mentioned in the email)

 use default;select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from test LATERAL
 VIEW
 explode(split(comments,',')) subView AS sub group by user_name,sub)w
 group
 by user_name;

 Spark With Hive:-
 package com.examples

 /**
  * Created by kalit_000 on 17/09/2015.
  */
 import org.apache.log4j.Logger
 import org.apache.log4j.Level
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._


 object HiveWordCount {

   def main(args: Array[String]): Unit =
   {
 Logger.getLogger("org").setLevel(Level.WARN)
 Logger.getLogger("akka").setLevel(Level.WARN)

 val conf = new

 SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
 "1g")
 val sc = new SparkContext(conf)
 val sqlContext= new SQLContext(sc)

 val hc=new HiveContext(sc)

 hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
 string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
 STORED AS TEXTFILE LOCATION '/data/kali/test' ")

 val op=hc.sql("select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from default.test
 LATERAL
 VIEW explode(split(comments,',')) subView AS sub group by
 user_name,sub)w
 group by user_name")

 op.collect.foreach(println)


   }




 Thanks




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Sean Owen
Yes, that's right, though "in order" depends on the RDD having an
ordering, but so does the zip-based solution.

Actually, I'm going to walk that back a bit, since I don't see a
guarantee that foldByKey behaves like foldLeft. The implementation
underneath, in combineByKey, appears that it will act this way in
practice though.

On Tue, Sep 22, 2015 at 4:45 AM, Philip Weaver  wrote:
> Hmm, ok, but I'm not seeing why foldByKey is more appropriate than
> reduceByKey? Specifically, is foldByKey guaranteed to walk the RDD in order,
> but reduceByKey is not?
>
> On Mon, Sep 21, 2015 at 8:41 PM, Sean Owen  wrote:
>>
>> The zero value here is None. Combining None with any row should yield
>> Some(row). After that, combining is a no-op for other rows.
>>
>> On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver 
>> wrote:
>> > Hmm, I don't think that's what I want. There's no "zero value" in my use
>> > case.
>> >
>> > On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:
>> >>
>> >> I think foldByKey is much more what you want, as it has more a notion
>> >> of building up some result per key by encountering values serially.
>> >> You would take the first and ignore the rest. Note that "first"
>> >> depends on your RDD having an ordering to begin with, or else you rely
>> >> on however it happens to be ordered after whatever operations give you
>> >> a key-value RDD.
>> >>
>> >> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver
>> >> 
>> >> wrote:
>> >> > I am processing a single file and want to remove duplicate rows by
>> >> > some
>> >> > key
>> >> > by always choosing the first row in the file for that key.
>> >> >
>> >> > The best solution I could come up with is to zip each row with the
>> >> > partition
>> >> > index and local index, like this:
>> >> >
>> >> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> >> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> >> > ((partitionIndex, localIndex), row)) }
>> >> > }
>> >> >
>> >> >
>> >> > And then using reduceByKey with a min ordering on the
>> >> > (partitionIndex,
>> >> > localIndex) pair.
>> >> >
>> >> > First, can i count on SparkContext.textFile to read the lines in such
>> >> > that
>> >> > the partition indexes are always increasing so that the above works?
>> >> >
>> >> > And, is there a better way to accomplish the same effect?
>> >> >
>> >> > Thanks!
>> >> >
>> >> > - Philip
>> >> >
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
Hmm, ok, but I'm not seeing why foldByKey is more appropriate than
reduceByKey? Specifically, is foldByKey guaranteed to walk the RDD in
order, but reduceByKey is not?

On Mon, Sep 21, 2015 at 8:41 PM, Sean Owen  wrote:

> The zero value here is None. Combining None with any row should yield
> Some(row). After that, combining is a no-op for other rows.
>
> On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver 
> wrote:
> > Hmm, I don't think that's what I want. There's no "zero value" in my use
> > case.
> >
> > On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:
> >>
> >> I think foldByKey is much more what you want, as it has more a notion
> >> of building up some result per key by encountering values serially.
> >> You would take the first and ignore the rest. Note that "first"
> >> depends on your RDD having an ordering to begin with, or else you rely
> >> on however it happens to be ordered after whatever operations give you
> >> a key-value RDD.
> >>
> >> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver  >
> >> wrote:
> >> > I am processing a single file and want to remove duplicate rows by
> some
> >> > key
> >> > by always choosing the first row in the file for that key.
> >> >
> >> > The best solution I could come up with is to zip each row with the
> >> > partition
> >> > index and local index, like this:
> >> >
> >> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> >> > ((partitionIndex, localIndex), row)) }
> >> > }
> >> >
> >> >
> >> > And then using reduceByKey with a min ordering on the (partitionIndex,
> >> > localIndex) pair.
> >> >
> >> > First, can i count on SparkContext.textFile to read the lines in such
> >> > that
> >> > the partition indexes are always increasing so that the above works?
> >> >
> >> > And, is there a better way to accomplish the same effect?
> >> >
> >> > Thanks!
> >> >
> >> > - Philip
> >> >
> >
> >
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Sean Owen
The zero value here is None. Combining None with any row should yield
Some(row). After that, combining is a no-op for other rows.

On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver  wrote:
> Hmm, I don't think that's what I want. There's no "zero value" in my use
> case.
>
> On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:
>>
>> I think foldByKey is much more what you want, as it has more a notion
>> of building up some result per key by encountering values serially.
>> You would take the first and ignore the rest. Note that "first"
>> depends on your RDD having an ordering to begin with, or else you rely
>> on however it happens to be ordered after whatever operations give you
>> a key-value RDD.
>>
>> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver 
>> wrote:
>> > I am processing a single file and want to remove duplicate rows by some
>> > key
>> > by always choosing the first row in the file for that key.
>> >
>> > The best solution I could come up with is to zip each row with the
>> > partition
>> > index and local index, like this:
>> >
>> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> > ((partitionIndex, localIndex), row)) }
>> > }
>> >
>> >
>> > And then using reduceByKey with a min ordering on the (partitionIndex,
>> > localIndex) pair.
>> >
>> > First, can i count on SparkContext.textFile to read the lines in such
>> > that
>> > the partition indexes are always increasing so that the above works?
>> >
>> > And, is there a better way to accomplish the same effect?
>> >
>> > Thanks!
>> >
>> > - Philip
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
Hmm, I don't think that's what I want. There's no "zero value" in my use
case.

On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:

> I think foldByKey is much more what you want, as it has more a notion
> of building up some result per key by encountering values serially.
> You would take the first and ignore the rest. Note that "first"
> depends on your RDD having an ordering to begin with, or else you rely
> on however it happens to be ordered after whatever operations give you
> a key-value RDD.
>
> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver 
> wrote:
> > I am processing a single file and want to remove duplicate rows by some
> key
> > by always choosing the first row in the file for that key.
> >
> > The best solution I could come up with is to zip each row with the
> partition
> > index and local index, like this:
> >
> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> > ((partitionIndex, localIndex), row)) }
> > }
> >
> >
> > And then using reduceByKey with a min ordering on the (partitionIndex,
> > localIndex) pair.
> >
> > First, can i count on SparkContext.textFile to read the lines in such
> that
> > the partition indexes are always increasing so that the above works?
> >
> > And, is there a better way to accomplish the same effect?
> >
> > Thanks!
> >
> > - Philip
> >
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Sean Owen
I think foldByKey is much more what you want, as it has more a notion
of building up some result per key by encountering values serially.
You would take the first and ignore the rest. Note that "first"
depends on your RDD having an ordering to begin with, or else you rely
on however it happens to be ordered after whatever operations give you
a key-value RDD.

On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver  wrote:
> I am processing a single file and want to remove duplicate rows by some key
> by always choosing the first row in the file for that key.
>
> The best solution I could come up with is to zip each row with the partition
> index and local index, like this:
>
> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> ((partitionIndex, localIndex), row)) }
> }
>
>
> And then using reduceByKey with a min ordering on the (partitionIndex,
> localIndex) pair.
>
> First, can i count on SparkContext.textFile to read the lines in such that
> the partition indexes are always increasing so that the above works?
>
> And, is there a better way to accomplish the same effect?
>
> Thanks!
>
> - Philip
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does one use s3 for checkpointing?

2015-09-21 Thread Utkarsh Sengar
We are using "spark-1.4.1-bin-hadoop2.4" on mesos (not EMR) with s3 to read
and write data and haven't noticed any inconsistencies with it, so 1
(mostly) and 2 definitely should not be a problem.
Regarding 3, are you setting the file system impl in spark config?

sparkContext.hadoopConfiguration().set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

And I have these dependencies if that helps.


org.apache.spark
spark-core_2.10
1.4.1


org.apache.hadoop
hadoop-mapreduce-client-core
2.4.1


-Utkarsh

On Mon, Sep 21, 2015 at 7:13 PM, Jerry Lam  wrote:

> Hi Amit,
>
> Have you looked at Amazon EMR? Most people using EMR use s3 for
> persistency (both as input and output of spark jobs).
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh  wrote:
>
>
> A lot of places in the documentation mention using s3 for checkpointing,
> however I haven't found any examples or concrete evidence of anyone having
> done this.
>
>1. Is this a safe/reliable option given the read-after-write
>consistency for PUTS in s3?
>2. Is s3 access broken for hadoop 2.6 (SPARK-7442
>)? If so, is it
>viable in 2.4?
>3. Related to #2. I did try providing hadoop-aws-2.6.0.jar while
>submitting the job and got the following stack trace. Is there a fix?
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError:
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   

Re: Spark on Yarn vs Standalone

2015-09-21 Thread Alexander Pivovarov
I repartitioned input RDD from 4,800 to 24,000 partitions
After that the stage (24000 tasks) was done in 22 min on 100 boxes
Shuffle read/write: 905 GB / 710 GB

Task Metrics (Dur/GC/Read/Write)
Min: 7s/1s/38MB/30MB
Med: 22s/9s/38MB/30MB
Max:1.8min/1.6min/38MB/30MB

On Mon, Sep 21, 2015 at 5:55 PM, Sandy Ryza  wrote:

> The warning your seeing in Spark is no issue.  The scratch space lives
> inside the heap, so it'll never result in YARN killing the container by
> itself.  The issue is that Spark is using some off-heap space on top of
> that.
>
> You'll need to bump the spark.yarn.executor.memoryOverhead property to
> give the executors some additional headroom above the heap space.
>
> -Sandy
>
> On Mon, Sep 21, 2015 at 5:43 PM, Saisai Shao 
> wrote:
>
>> I think you need to increase the memory size of executor through command
>> arguments "--executor-memory", or configuration "spark.executor.memory".
>>
>> Also yarn.scheduler.maximum-allocation-mb in Yarn side if necessary.
>>
>> Thanks
>> Saisai
>>
>>
>> On Mon, Sep 21, 2015 at 5:13 PM, Alexander Pivovarov <
>> apivova...@gmail.com> wrote:
>>
>>> I noticed that some executors have issue with scratch space.
>>> I see the following in yarn app container stderr around the time when
>>> yarn killed the executor because it uses too much memory.
>>>
>>> -- App container stderr --
>>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>>> rdd_6_346 in memory! (computed 3.0 GB so far)
>>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>>> Storage limit = 25.2 GB.
>>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition
>>> rdd_6_346 to disk instead.
>>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>>> rdd_6_49 in memory! (computed 3.1 GB so far)
>>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>>> Storage limit = 25.2 GB.
>>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49
>>> to disk instead.
>>>
>>> -- Yarn Nodemanager log --
>>> 2015-09-21 21:44:05,716 WARN
>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
>>> (Container Monitor): Container
>>> [pid=5114,containerID=container_1442869100946_0001_01_0
>>> 00056] is running beyond physical memory limits. Current usage: 52.2 GB
>>> of 52 GB physical memory used; 53.0 GB of 260 GB virtual memory used.
>>> Killing container.
>>> Dump of the process-tree for container_1442869100946_0001_01_56 :
>>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>>> |- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
>>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
>>> -Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
>>> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
>>> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
>>> -XX:+CMSClassUnloadingEnabled
>>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>>> -Dspark.akka.failure-detector.threshold=3000.0
>>> -Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
>>> -Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
>>> -Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
>>> -Dspark.driver.port=52690
>>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>>> application_1442869100946_0001 --user-class-path
>>> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
>>> |- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
>>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
>>> -Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
>>> '-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
>>> '-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
>>> '-XX:+CMSClassUnloadingEnabled'
>>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>>> '-Dspark.akka.failure-detector.threshold=3000.0'
>>> '-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
>>> '-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
>>> '-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
>>> '-Dspark.driver.port=52690'
>>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_

Re: How does one use s3 for checkpointing?

2015-09-21 Thread Jerry Lam
Hi Amit,

Have you looked at Amazon EMR? Most people using EMR use s3 for persistency 
(both as input and output of spark jobs). 

Best Regards,

Jerry

Sent from my iPhone

> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh  wrote:
> 
> 
> A lot of places in the documentation mention using s3 for checkpointing, 
> however I haven't found any examples or concrete evidence of anyone having 
> done this.
> Is this a safe/reliable option given the read-after-write consistency for 
> PUTS in s3?
> Is s3 access broken for hadoop 2.6 (SPARK-7442)? If so, is it viable in 2.4?
> Related to #2. I did try providing hadoop-aws-2.6.0.jar while submitting the 
> job and got the following stack trace. Is there a fix?
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: 
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at 
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at 
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError: 
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 32 more
> 
> Thanks!
> Amit
> 


Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Alexis Gillain
As Igor said header must be available on each partition so the solution is
broadcasting it.

About the difference between repl and scala IDE, it may come from the
sparkContext setup as REPL define one by default.

2015-09-22 8:41 GMT+08:00 Igor Berman :

> Try to broadcasr header
> On Sep 22, 2015 08:07, "Balaji Vijayan" 
> wrote:
>
>> Howdy,
>>
>> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior
>> that I'm seeing in 2 of my local Spark/Scala environments (Scala for
>> Jupyter and Scala IDE) but not the 3rd (Spark Shell). The following code
>> throws the following stack trace error in the former 2 environments but
>> executes successfully in the 3rd. I'm not sure how to go about
>> troubleshooting my former 2 environments so any assistance is greatly
>> appreciated.
>>
>> Code:
>>
>> //get file
>> val logFile = "s3n://file"
>> val logData  = sc.textFile(logFile)
>> // header
>> val header =  logData.first
>> // filter out header
>> val sample = logData.filter(!_.contains(header)).map {
>>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
>> }.takeSample(false,100,12L)
>>
>> Stack Trace:
>>
>> org.apache.spark.SparkException: Task not serializable
>>  
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>>  
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>  org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>  org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>  org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:134)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:133)
>> java.io.NotSerializableException: org.apache.spark.SparkConf
>> Serialization stack:
>>  - object not serializable (class: org.apache.spark.SparkConf, value: 
>> org.apache.spark.SparkConf@309ed441)
>>  - field (class: cmd2$$user, name: conf, type: class 
>> org.apache.spark.SparkConf)
>>  - object (class cmd2$$user, cmd2$$user@75a88665)
>>  - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>>  - object (class cmd6, cmd6@5e9e8f0b)
>>  - field (class: cmd6$$user, name: $outer, type: class cmd6)
>>  - object (class cmd6$$user, cmd6$$user@692f81c)
>>  - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
>> cmd6$$user)
>>  - object (class cmd6$$user$$anonfun$3, )
>>  - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
>> type: class cmd6$$user$$anonfun$3)
>>  - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>>  
>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>  
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>  
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>>  
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>>  
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>  org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>  org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>  org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:134)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:133)
>>
>> Thanks,
>> Balaji
>>
>


-- 
Alexis GILLAIN


How does one use s3 for checkpointing?

2015-09-21 Thread Amit Ramesh
A lot of places in the documentation mention using s3 for checkpointing,
however I haven't found any examples or concrete evidence of anyone having
done this.

   1. Is this a safe/reliable option given the read-after-write consistency
   for PUTS in s3?
   2. Is s3 access broken for hadoop 2.6 (SPARK-7442
   )? If so, is it viable
   in 2.4?
   3. Related to #2. I did try providing hadoop-aws-2.6.0.jar while
   submitting the job and got the following stack trace. Is there a fix?

py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:224)
at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
at
org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
at
org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
at
org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.SparkContext.(SparkContext.scala:475)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError:
com/amazonaws/AmazonServiceException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
at java.lang.Class.getConstructor0(Class.java:2885)
at java.lang.Class.newInstance(Class.java:350)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
... 27 more
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.AmazonServiceException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 32 more

Thanks!
Amit


Re: Spark on Yarn vs Standalone

2015-09-21 Thread Sandy Ryza
The warning your seeing in Spark is no issue.  The scratch space lives
inside the heap, so it'll never result in YARN killing the container by
itself.  The issue is that Spark is using some off-heap space on top of
that.

You'll need to bump the spark.yarn.executor.memoryOverhead property to give
the executors some additional headroom above the heap space.

-Sandy

On Mon, Sep 21, 2015 at 5:43 PM, Saisai Shao  wrote:

> I think you need to increase the memory size of executor through command
> arguments "--executor-memory", or configuration "spark.executor.memory".
>
> Also yarn.scheduler.maximum-allocation-mb in Yarn side if necessary.
>
> Thanks
> Saisai
>
>
> On Mon, Sep 21, 2015 at 5:13 PM, Alexander Pivovarov  > wrote:
>
>> I noticed that some executors have issue with scratch space.
>> I see the following in yarn app container stderr around the time when
>> yarn killed the executor because it uses too much memory.
>>
>> -- App container stderr --
>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>> rdd_6_346 in memory! (computed 3.0 GB so far)
>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>> Storage limit = 25.2 GB.
>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_346
>> to disk instead.
>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>> rdd_6_49 in memory! (computed 3.1 GB so far)
>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>> Storage limit = 25.2 GB.
>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49
>> to disk instead.
>>
>> -- Yarn Nodemanager log --
>> 2015-09-21 21:44:05,716 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
>> (Container Monitor): Container
>> [pid=5114,containerID=container_1442869100946_0001_01_0
>> 00056] is running beyond physical memory limits. Current usage: 52.2 GB
>> of 52 GB physical memory used; 53.0 GB of 260 GB virtual memory used.
>> Killing container.
>> Dump of the process-tree for container_1442869100946_0001_01_56 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
>> -Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
>> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
>> -XX:+CMSClassUnloadingEnabled
>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>> -Dspark.akka.failure-detector.threshold=3000.0
>> -Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
>> -Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
>> -Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
>> -Dspark.driver.port=52690
>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>> application_1442869100946_0001 --user-class-path
>> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
>> |- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
>> -Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
>> '-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
>> '-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
>> '-XX:+CMSClassUnloadingEnabled'
>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>> '-Dspark.akka.failure-detector.threshold=3000.0'
>> '-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
>> '-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
>> '-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
>> '-Dspark.driver.port=52690'
>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>> application_1442869100946_0001 --user-class-path
>> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.j

Re: Spark on Yarn vs Standalone

2015-09-21 Thread Saisai Shao
I think you need to increase the memory size of executor through command
arguments "--executor-memory", or configuration "spark.executor.memory".

Also yarn.scheduler.maximum-allocation-mb in Yarn side if necessary.

Thanks
Saisai


On Mon, Sep 21, 2015 at 5:13 PM, Alexander Pivovarov 
wrote:

> I noticed that some executors have issue with scratch space.
> I see the following in yarn app container stderr around the time when yarn
> killed the executor because it uses too much memory.
>
> -- App container stderr --
> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
> rdd_6_346 in memory! (computed 3.0 GB so far)
> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
> + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
> limit = 25.2 GB.
> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_346
> to disk instead.
> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
> rdd_6_49 in memory! (computed 3.1 GB so far)
> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
> + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
> limit = 25.2 GB.
> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49
> to disk instead.
>
> -- Yarn Nodemanager log --
> 2015-09-21 21:44:05,716 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
> (Container Monitor): Container
> [pid=5114,containerID=container_1442869100946_0001_01_0
> 00056] is running beyond physical memory limits. Current usage: 52.2 GB of
> 52 GB physical memory used; 53.0 GB of 260 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_1442869100946_0001_01_56 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
> -Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
> -XX:+CMSClassUnloadingEnabled
> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
> -Dspark.akka.failure-detector.threshold=3000.0
> -Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
> -Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
> -Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
> -Dspark.driver.port=52690
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
> application_1442869100946_0001 --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
> |- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
> -Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
> '-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
> '-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
> '-XX:+CMSClassUnloadingEnabled'
> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
> '-Dspark.akka.failure-detector.threshold=3000.0'
> '-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
> '-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
> '-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
> '-Dspark.driver.port=52690'
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
> application_1442869100946_0001 --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
> 1>
> /var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stdout
> 2>
> /var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stderr
>
>
>
> Is it possible to get what scratch space is used for?
>
> What spark setting should I try to adjust to solve the issue?
>
> On Thu, Sep 10, 2015 at 2:52 PM, Sandy Ryza 
> wrote:
>
>> YARN will never kill processes for being unresponsive.
>>
>> It may kill processes

Re: word count (group by users) in spark

2015-09-21 Thread Zhang, Jingyu
Spark spills data to disk when there is more data shuffled onto a single
executor machine than can fit in memory. However, it flushes out the data
to disk one key at a time - so if a single key has more key-value pairs
than can fit in memory, an out of memory exception occurs.

Cheers,

Jingyu

On 21 September 2015 at 16:39, Aniket Bhatnagar 
wrote:

> Unless I am mistaken, in a group by operation, it spills to disk in case
> values for a key don't fit in memory.
>
> Thanks,
> Aniket
>
> On Mon, Sep 21, 2015 at 10:43 AM Huy Banh  wrote:
>
>> Hi,
>>
>> If your input format is user -> comment, then you could:
>>
>> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
>> four three")))
>> val wordCounts = comments.
>>flatMap({case (user, comment) =>
>> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>>reduceByKey(_ + _)
>>
>> val output = wordCounts.
>>map({case ((user, word), count) => (user, (word, count))}).
>>groupByKey()
>>
>> By Aniket, if we group by user first, it could run out of memory when
>> spark tries to put all words in a single sequence, couldn't it?
>>
>> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Using scala API, you can first group by user and then use combineByKey.
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Hi All,
 I would like to achieve this below output using spark , I managed to
 write
 in Hive and call it in spark but not in just spark (scala), how to group
 word counts on particular user (column) for example.
 Imagine users and their given tweets I want to do word count based on
 user
 name.

 Input:-
 kaliA,B,A,B,B
 james B,A,A,A,B

 Output:-
 kali A [Count] B [Count]
 James A [Count] B [Count]

 My Hive Answer:-
 CREATE EXTERNAL TABLE  TEST
 (
  user_name string   ,
  COMMENTS  STRING

 )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
 LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
 create a text file with data mentioned in the email)

 use default;select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from test LATERAL
 VIEW
 explode(split(comments,',')) subView AS sub group by user_name,sub)w
 group
 by user_name;

 Spark With Hive:-
 package com.examples

 /**
  * Created by kalit_000 on 17/09/2015.
  */
 import org.apache.log4j.Logger
 import org.apache.log4j.Level
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._


 object HiveWordCount {

   def main(args: Array[String]): Unit =
   {
 Logger.getLogger("org").setLevel(Level.WARN)
 Logger.getLogger("akka").setLevel(Level.WARN)

 val conf = new

 SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
 "1g")
 val sc = new SparkContext(conf)
 val sqlContext= new SQLContext(sc)

 val hc=new HiveContext(sc)

 hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
 string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
 STORED AS TEXTFILE LOCATION '/data/kali/test' ")

 val op=hc.sql("select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from default.test
 LATERAL
 VIEW explode(split(comments,',')) subView AS sub group by
 user_name,sub)w
 group by user_name")

 op.collect.foreach(println)


   }




 Thanks




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to t

Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Igor Berman
Try to broadcasr header
On Sep 22, 2015 08:07, "Balaji Vijayan"  wrote:

> Howdy,
>
> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
> I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
> Scala IDE) but not the 3rd (Spark Shell). The following code throws the
> following stack trace error in the former 2 environments but executes
> successfully in the 3rd. I'm not sure how to go about troubleshooting my
> former 2 environments so any assistance is greatly appreciated.
>
> Code:
>
> //get file
> val logFile = "s3n://file"
> val logData  = sc.textFile(logFile)
> // header
> val header =  logData.first
> // filter out header
> val sample = logData.filter(!_.contains(header)).map {
>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
> }.takeSample(false,100,12L)
>
> Stack Trace:
>
> org.apache.spark.SparkException: Task not serializable
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
> java.io.NotSerializableException: org.apache.spark.SparkConf
> Serialization stack:
>   - object not serializable (class: org.apache.spark.SparkConf, value: 
> org.apache.spark.SparkConf@309ed441)
>   - field (class: cmd2$$user, name: conf, type: class 
> org.apache.spark.SparkConf)
>   - object (class cmd2$$user, cmd2$$user@75a88665)
>   - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>   - object (class cmd6, cmd6@5e9e8f0b)
>   - field (class: cmd6$$user, name: $outer, type: class cmd6)
>   - object (class cmd6$$user, cmd6$$user@692f81c)
>   - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
> cmd6$$user)
>   - object (class cmd6$$user$$anonfun$3, )
>   - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
> type: class cmd6$$user$$anonfun$3)
>   - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>   
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
>
> Thanks,
> Balaji
>


Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
I am processing a single file and want to remove duplicate rows by some key
by always choosing the first row in the file for that key.

The best solution I could come up with is to zip each row with the
partition index and local index, like this:

rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
  rows.zipWithIndex.map { case (row, localIndex) => (row.key,
((partitionIndex, localIndex), row)) }
}


And then using reduceByKey with a min ordering on the (partitionIndex,
localIndex) pair.

First, can i count on SparkContext.textFile to read the lines in such that
the partition indexes are always increasing so that the above works?

And, is there a better way to accomplish the same effect?

Thanks!

- Philip


spark.mesos.coarse impacts memory performance on mesos

2015-09-21 Thread Utkarsh Sengar
I am running Spark 1.4.1 on mesos.

The spark job does a "cartesian" of 4 RDDs (aRdd, bRdd, cRdd, dRdd) of size
100, 100, 7 and 1 respectively. Lets call it prouctRDD.

Creation of "aRdd" needs data pull from multiple data sources, merging it
and creating a tuple of JavaRdd, finally aRDD looks something like this:
JavaRDD>
bRdd, cRdd and dRdds are just List<> of values.

Then apply a transformation on prouctRDD and finally call "saveAsTextFile"
to save the result of my transformation.

Problem:
By setting "spark.mesos.coarse=true", creation of "aRdd" works fine but
driver crashes while doing the cartesian but when I do
"spark.mesos.coarse=true", the job works like a charm. I am running spark
on mesos.

Comments:
So I wanted to understand what role does "spark.mesos.coarse=true" plays in
terms of memory and compute performance. My findings look counter intuitive
since:

   1. "spark.mesos.coarse=true" just runs on 1 mesos task, so there should
   be an overhead of spinning up mesos tasks which should impact the
   performance.
   2. What config for "spark.mesos.coarse" recommended for running spark on
   mesos? Or there is no best answer and it depends on usecase?
   3. Also by setting "spark.mesos.coarse=true", I notice that I get huge
   GC pauses even with small dataset but a long running job (but this can be a
   separate discussion).

Let me know if I am missing something obvious, we are learning spark tuning
as we move forward :)

-- 
Thanks,
-Utkarsh


Re: Spark on Yarn vs Standalone

2015-09-21 Thread Alexander Pivovarov
I noticed that some executors have issue with scratch space.
I see the following in yarn app container stderr around the time when yarn
killed the executor because it uses too much memory.

-- App container stderr --
15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
rdd_6_346 in memory! (computed 3.0 GB so far)
15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
+ 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
limit = 25.2 GB.
15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_346
to disk instead.
15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
rdd_6_49 in memory! (computed 3.1 GB so far)
15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
+ 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
limit = 25.2 GB.
15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49 to
disk instead.

-- Yarn Nodemanager log --
2015-09-21 21:44:05,716 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Container
[pid=5114,containerID=container_1442869100946_0001_01_0
00056] is running beyond physical memory limits. Current usage: 52.2 GB of
52 GB physical memory used; 53.0 GB of 260 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1442869100946_0001_01_56 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
/usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
-Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
-XX:+CMSClassUnloadingEnabled
-Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
-Dspark.akka.failure-detector.threshold=3000.0
-Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
-Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
-Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
-Dspark.driver.port=52690
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
--executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
application_1442869100946_0001 --user-class-path
file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
|- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
/usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
-Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
'-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
'-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
'-XX:+CMSClassUnloadingEnabled'
-Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
'-Dspark.akka.failure-detector.threshold=3000.0'
'-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
'-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
'-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
'-Dspark.driver.port=52690'
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
--executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
application_1442869100946_0001 --user-class-path
file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
1>
/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stdout
2>
/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stderr



Is it possible to get what scratch space is used for?

What spark setting should I try to adjust to solve the issue?

On Thu, Sep 10, 2015 at 2:52 PM, Sandy Ryza  wrote:

> YARN will never kill processes for being unresponsive.
>
> It may kill processes for occupying more memory than it allows.  To get
> around this, you can either bump spark.yarn.executor.memoryOverhead or turn
> off the memory checks entirely with yarn.nodemanager.pmem-check-enabled.
>
> -Sandy
>
> On Tue, Sep 8, 2015 at 10:48 PM, Alexander Pivovarov  > wrote:
>
>> The problem which we have now is skew data (2360 tasks done in 5 min, 3
>> tasks in 40 min and 1 task in 2 hours)
>>
>> Some people from the team worry that the exec

Re: how to get RDD from two different RDDs with cross column

2015-09-21 Thread Zhiliang Zhu
Hi Romi,
Yes, you understand it correctly.And rdd1 keys are cross with rdd2 keys, that 
is, there are lots of same keys between rdd1 and rdd2, and there are some keys 
inrdd1 but not in rdd2, there are also some keys in rdd2 but not in rdd1.Then 
rdd3 keys would be same with rdd1 keys, rdd3 will not include the keys in rdd2 
but not in rdd1, values of rdd3 will comefrom rdd2, if the keys in rdd3 is not 
in rdd2 its value would  NOT exist.

You are always much perfect in spark and  having the solution about the 
questions, really appreciate you very much.
Thank you very much~
Zhiliang  


 On Tuesday, September 22, 2015 4:08 AM, Romi Kuntsman  
wrote:
   

 Hi,
If I understand correctly:
rdd1 contains keys (of type StringDate)
rdd2 contains keys and values
and rdd3 contains all the keys, and the values from rdd2?

I think you should make rdd1 and rdd2 PairRDD, and then use outer join.
Does that make sense?

On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu  wrote:

Dear Romi, Priya, Sujt and Shivaram and all,
I have took lots of days to think into this issue, however, without  any enough 
good solution...I shall appreciate your all kind help.
There is an RDD rdd1, and another RDD rdd2, 
(rdd2 can be PairRDD, or DataFrame with two columns as ).StringDate column values from rdd1 and rdd2 are cross but not the same.

I would like to get a new RDD rdd3, StringDate in rdd3 would 
be all from (same) as rdd1, and float in rdd3 would be from rdd2 if its 
StringDate is in rdd2, or else NULL would be assigned.
each row in rdd3[ i ] = , 
rdd2[i].StringDate would be same as rdd1[ i ].StringDate, 
then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. What kinds of API 
or function would I use...
Thanks very much!Zhiliang




  

Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Ted Yu
Which release are you using ?

>From the line number in ClosureCleaner, it seems you're using 1.4.x

Cheers

On Mon, Sep 21, 2015 at 4:07 PM, Balaji Vijayan 
wrote:

> Howdy,
>
> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
> I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
> Scala IDE) but not the 3rd (Spark Shell). The following code throws the
> following stack trace error in the former 2 environments but executes
> successfully in the 3rd. I'm not sure how to go about troubleshooting my
> former 2 environments so any assistance is greatly appreciated.
>
> Code:
>
> //get file
> val logFile = "s3n://file"
> val logData  = sc.textFile(logFile)
> // header
> val header =  logData.first
> // filter out header
> val sample = logData.filter(!_.contains(header)).map {
>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
> }.takeSample(false,100,12L)
>
> Stack Trace:
>
> org.apache.spark.SparkException: Task not serializable
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
> java.io.NotSerializableException: org.apache.spark.SparkConf
> Serialization stack:
>   - object not serializable (class: org.apache.spark.SparkConf, value: 
> org.apache.spark.SparkConf@309ed441)
>   - field (class: cmd2$$user, name: conf, type: class 
> org.apache.spark.SparkConf)
>   - object (class cmd2$$user, cmd2$$user@75a88665)
>   - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>   - object (class cmd6, cmd6@5e9e8f0b)
>   - field (class: cmd6$$user, name: $outer, type: class cmd6)
>   - object (class cmd6$$user, cmd6$$user@692f81c)
>   - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
> cmd6$$user)
>   - object (class cmd6$$user$$anonfun$3, )
>   - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
> type: class cmd6$$user$$anonfun$3)
>   - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>   
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
>
> Thanks,
> Balaji
>


Re: HiveQL Compatibility (0.12.0, 0.13.0???)

2015-09-21 Thread Michael Armbrust
In general we welcome pull requests for these kind of updates.  In this
case its already been fixed in master and branch-1.5 and will be updated
when we release 1.5.1 (hopefully soon).

On Mon, Sep 21, 2015 at 1:21 PM, Dominic Ricard <
dominic.ric...@tritondigital.com> wrote:

> Hi,
>here's a statement from the Spark 1.5.0  Spark SQL and DataFrame Guide
> <
> https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive
> >
> :
>
> *Compatibility with Apache Hive*
> Spark SQL is designed to be compatible with the Hive Metastore, SerDes and
> UDFs. Currently Spark SQL is based on Hive 0.12.0 and 0.13.1.
>
> After testing many functions available in 1.1.0 and 1.2.0, I tend to think
> that this is no longer true...
>
> Could someone update the documentation or tell me what these versions refer
> to as it appears that Spark SQL 1.5.0 support everything in Hive 1.2.0...
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HiveQL-Compatibility-0-12-0-0-13-0-tp24757.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-21 Thread tridib
Did you get any solution to this? I am getting same issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750p24759.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Balaji Vijayan
Howdy,

I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
Scala IDE) but not the 3rd (Spark Shell). The following code throws the
following stack trace error in the former 2 environments but executes
successfully in the 3rd. I'm not sure how to go about troubleshooting my
former 2 environments so any assistance is greatly appreciated.

Code:

//get file
val logFile = "s3n://file"
val logData  = sc.textFile(logFile)
// header
val header =  logData.first
// filter out header
val sample = logData.filter(!_.contains(header)).map {
 line => line.replaceAll("['\"]","").substring(0,line.length()-1)
}.takeSample(false,100,12L)

Stack Trace:

org.apache.spark.SparkException: Task not serializable

org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)

org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
org.apache.spark.rdd.RDD.filter(RDD.scala:310)
cmd6$$user$$anonfun$3.apply(Main.scala:134)
cmd6$$user$$anonfun$3.apply(Main.scala:133)
java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
- object not serializable (class: org.apache.spark.SparkConf, value:
org.apache.spark.SparkConf@309ed441)
- field (class: cmd2$$user, name: conf, type: class 
org.apache.spark.SparkConf)
- object (class cmd2$$user, cmd2$$user@75a88665)
- field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
- object (class cmd6, cmd6@5e9e8f0b)
- field (class: cmd6$$user, name: $outer, type: class cmd6)
- object (class cmd6$$user, cmd6$$user@692f81c)
- field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
cmd6$$user)
- object (class cmd6$$user$$anonfun$3, )
- field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer,
type: class cmd6$$user$$anonfun$3)
- object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )

org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)

org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
org.apache.spark.rdd.RDD.filter(RDD.scala:310)
cmd6$$user$$anonfun$3.apply(Main.scala:134)
cmd6$$user$$anonfun$3.apply(Main.scala:133)

Thanks,
Balaji


Iterator-based streaming, how is it efficient ?

2015-09-21 Thread Samuel Hailu
Hi,

In Spark's in-memory logic, without cache, elements are accessed in an
iterator-based streaming style [
http://www.slideshare.net/liancheng/dtcc-14-spark-runtime-internals?next_slideshow=1
]

I have two questions:


   1. if elements are read one line at at time from HDFS (disk) and then
   transformed based on the rdd operations, how is this efficient?
   2. which class in the Spark source does this? I'm expecting some kind of:

   for (partition_index <- iterator_over_a_partition)
   read_hdfs_line(partition_index).apply_tranformation()


Thanks,


Spark Streaming distributed job

2015-09-21 Thread nibiau
Hello, 
Please could you explain me what is exactly distributed when I launch a spark 
streaming job over YARN cluster ?
My code is something like :

JavaDStream customReceiverStream = 
ssc.receiverStream(streamConfig.getJmsReceiver());

JavaDStream incoming_msg = customReceiverStream.map(
new Function()
{
public String call(JMSEvent jmsEvent)
{
return jmsEvent.getText();
}
}
);

incoming_msg.foreachRDD( new Function,  Void>() {
public Void call(JavaRDD rdd) throws Exception {
rdd.foreachPartition(new VoidFunction>() { 

@Override
public void call(Iterator msg) throws Exception 
{
while (msg.hasNext()) {
   // insert message in MongoDB
}


So, in this code , at what step is done the distribution over YARN :
- Does my receiver is distributed (and so all the rest also) ?
- Does the foreachRDD is distributed (and so all the rest also)?
- Does foreachPartition is distributed ?

Tks
Nicolas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Mesos Tasks only run on one node

2015-09-21 Thread John Omernik
I have a happy healthy Mesos cluster (0.24) running in my lab.  I've
compiled spark-1.5.0 and it seems to be working fine, except for one small
issue, my tasks all seem to run on one node. (I have 6 in the cluster).

Basically, I have directory of compressed text files.  Compressed, these 25
files add up to 1.2 GB of data, in bin/pyspark I do:

txtfiles = sc.textFile("/path/to/my/data/*")
txtfiles.count()

This goes through and gives me the correct count, but all my tasks (25 of
them) run on one node, let's call it node4.

Interesting.

So I was running spark from node4, but I would have thought it would have
hit up more nodes.

So I ran it on node5.  In executors tab on the spark UI, there is only one
registered, and it's node4, and once again all tasks ran on node4.

I am running in fine grain mode... is there a setting somewhere to allow
for more executors? This seems weird. I've been away from Spark from 1.2.x
but I don't seem to remember this...


Re: Spark data type guesser UDAF

2015-09-21 Thread Ruslan Dautkhanov
Does it deserve to be a JIRA in Spark / Spark MLLib?
How do you guys normally determine data types?

Frameworks like h2o automatically determine data type scanning a sample of
data, or whole dataset.
So then one can decide e.g. if a variable should be a categorical variable
or numerical.

Another use case is if you get an arbitrary data set (we get them quite
often), and want to save as a Parquet table.
Providing correct data types make parquet more space effiecient (and
probably more query-time performant, e.g.
better parquet bloom filters than just storing everything as
string/varchar).



-- 
Ruslan Dautkhanov

On Thu, Sep 17, 2015 at 12:32 PM, Ruslan Dautkhanov 
wrote:

> Wanted to take something like this
>
> https://github.com/fitzscott/AirQuality/blob/master/HiveDataTypeGuesser.java
> and create a Hive UDAF to create an aggregate function that returns a data
> type guess.
> Am I inventing a wheel?
> Does Spark have something like this already built-in?
> Would be very useful for new wide datasets to explore data. Would be
> helpful for ML too,
> e.g. to decide categorical vs numerical variables.
>
>
> Ruslan
>
>


Re: Why are executors on slave never used?

2015-09-21 Thread Andrew Or
Hi Joshua,

What cluster manager are you using, standalone or YARN? (Note that
standalone here does not mean local mode).

If standalone, you need to do `setMaster("spark://[CLUSTER_URL]:7077")`,
where CLUSTER_URL is the machine that started the standalone Master. If
YARN, you need to do `setMaster("yarn")`, assuming that all the Hadoop
configurations files such as core-site.xml are already set up properly.

-Andrew


2015-09-21 8:53 GMT-07:00 Hemant Bhanawat :

> When you specify master as local[2], it starts the spark components in a
> single jvm. You need to specify the master correctly.
> I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I
> run a Spark process, it works fine -- but only on the master, as if it were
> standalone.
>
> The web-UI and logging code shows only 1 executor, the localhost.
>
> How can I diagnose this?
>
> (I create *SparkConf, *in Python, with *setMaster('local[2]'). )*
>
> (Strangely, though I don't think that this causes the problem, there is
> almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
> few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
>  datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
> cluster as created by* create-cluster*, so I would assume that the slave
> and master are configured OK out-of the box.)
>
> Joshua
>


Slow Performance with Apache Spark Gradient Boosted Tree training runs

2015-09-21 Thread vkutsenko
I'm experimenting with  Gradient Boosted Trees
   learning
algorithm from ML library of Spark 1.4. I'm solving a binary classification
problem where my input is ~50,000 samples and ~500,000 features. My goal is
to output the definition of the resulting GBT ensemble in human-readable
format. My experience so far is that for my problem size adding more
resources to the cluster seems to not have an effect on the length of the
run. A 10-iteration training run seem to roughly take 13hrs. This isn't
acceptable since I'm looking to do 100-300 iteration runs, and the execution
time seems to explode with the number of iterations.

*My Spark application*
This isn't the exact code, but it can be reduced to:

SparkConf sc = new SparkConf().setAppName("GBT Trainer")
// unlimited max result size for intermediate Map-Reduce
ops.
// Having no limit is probably bad, but I've not had time to
find
// a tighter upper bound and the default value wasn't
sufficient.
.set("spark.driver.maxResultSize", "0");
JavaSparkContext jsc = new JavaSparkContext(sc)

// The input file is encoded in plain-text LIBSVM format ~59GB in size
 data = MLUtils.loadLibSVMFile(jsc.sc(),
"s3://somebucket/somekey/plaintext_libsvm_file").toJavaRDD();

BoostingStrategy boostingStrategy =
BoostingStrategy.defaultParams("Classification");
boostingStrategy.setNumIterations(10);
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(1);
Map categoricalFeaturesInfo = new HashMap();
   
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);

GradientBoostedTreesModel model = GradientBoostedTrees.train(data,
boostingStrategy);

// Somewhat-convoluted code below reads in Parquete-formatted output
// of the GBT model and writes it back out as json.
// There might be cleaner ways of achieving the same, but since output
// size is only a few KB I feel little guilt leaving it as is.

// serialize and output the GBT classifier model the only way that the
library allows
String outputPath = "s3://somebucket/somekeyprefex";
model.save(jsc.sc(), outputPath + "/parquet");
// read in the parquet-formatted classifier output as a generic
DataFrame object
SQLContext sqlContext = new SQLContext(jsc);
DataFrame outputDataFrame = sqlContext.read().parquet(outputPath +
"/parquet"));
// output DataFrame-formatted classifier model as json   
outputDataFrame.write().format("json").save(outputPath + "/json");

*Question*
What is the performance bottleneck with my Spark application (or with GBT
learning algorithm itself) on input of that size and how can I achieve
greater execution parallelism?

I'm still a novice Spark dev, and I'd appreciate any tips on cluster
configuration and execution profiling. 


*More details on the cluster setup*

I'm running this app on a AWS EMR cluster (emr-4.0.0, YARN cluster mode) of
r3.8xlarge instances (32 cores, 244GB RAM each). I'm using such large
instances in order to maximize flexibility of resource allocation. So far
I've tried using 1-3 r3.8xlarge instances with a variety of resource
allocation schemes between the driver and workers. For example, for a
cluster of 1 r3.8xlarge instances I submit the app as follows:

aws emr add-steps --cluster-id $1 --steps Name=$2,\
   
Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\
Args=[/usr/lib/spark/bin/spark-submit,--verbose,\
--deploy-mode,cluster,--master,yarn,\
--driver-memory,60G,\
--executor-memory,30G,\
--executor-cores,5,\
--num-executors,6,\
--class,GbtTrainer,\
"s3://somebucket/somekey/spark.jar"],\
ActionOnFailure=CONTINUE

For a cluster of 3 r3.8xlarge instances I tweak resource allocation:

--driver-memory,80G,\
--executor-memory,35G,\
--executor-cores,5,\
--num-executors,18,\

I don't have a clear idea of how much memory is useful to give to every
executor, but I feel that I'm being generous in either case. Looking through
Spark UI, I'm not seeing task with input size of more than a few GB. I'm
steering on the side of caution when giving the driver process so much
memory in order to ensure that it isn't memory starved for any intermediate
result-aggregation operations.

I'm trying to keep the number of cores per executor down to 5 as per
suggestions in  Clouderas How To Tune Your Spark Jobs series

  
(according to them, more that 5 cores tends to introduce a HDFS IO
bottleneck). I'm also making sure that there is enough of spare RAM and CPUs
left over for the host OS and Hadoop services.

*My findings thus far*
My only clue is Spark UI showing very long Scheduling Delay for a number of
tasks at the tail-end of execution. I 

HiveQL Compatibility (0.12.0, 0.13.0???)

2015-09-21 Thread Dominic Ricard
Hi,
   here's a statement from the Spark 1.5.0  Spark SQL and DataFrame Guide

 
:

*Compatibility with Apache Hive*
Spark SQL is designed to be compatible with the Hive Metastore, SerDes and
UDFs. Currently Spark SQL is based on Hive 0.12.0 and 0.13.1.

After testing many functions available in 1.1.0 and 1.2.0, I tend to think
that this is no longer true...

Could someone update the documentation or tell me what these versions refer
to as it appears that Spark SQL 1.5.0 support everything in Hive 1.2.0...

Thank you. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveQL-Compatibility-0-12-0-0-13-0-tp24757.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Null Value in DecimalType column of DataFrame

2015-09-21 Thread Reynold Xin
+dev list

Hi Dirceu,

The answer to whether throwing an exception is better or null is better
depends on your use case. If you are debugging and want to find bugs with
your program, you might prefer throwing an exception. However, if you are
running on a large real-world dataset (i.e. data is dirty) and your query
can take a while (e.g. 30 mins), you then might prefer the system to just
assign null values to the dirty data that could lead to runtime exceptions,
because otherwise you could be spending days just to clean your data.

Postgres throws exceptions here, but I think that's mainly because it is
used for OLTP, and in those cases queries are short-running. Most other
analytic databases I believe just return null. The best we can do is to
provide a config option to indicate behavior for exception handling.


On Fri, Sep 18, 2015 at 8:15 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Yin, I got that part.
> I just think that instead of returning null, throwing an exception would
> be better. In the exception message we can explain that the DecimalType
> used can't fit the number that is been converted due to the precision and
> scale values used to create it.
> It would be easier for the user to find the reason why that error is
> happening, instead of receiving an NullPointerException in another part of
> his code. We can also make a better documentation of DecimalType classes to
> explain this behavior, what do you think?
>
>
>
>
> 2015-09-17 18:52 GMT-03:00 Yin Huai :
>
>> As I mentioned before, the range of values of DecimalType(10, 10) is [0,
>> 1). If you have a value 10.5 and you want to cast it to DecimalType(10,
>> 10), I do not think there is any better returned value except of null.
>> Looks like DecimalType(10, 10) is not the right type for your use case. You
>> need a decimal type that has precision - scale >= 2.
>>
>> On Tue, Sep 15, 2015 at 6:39 AM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>>>
>>> Hi Yin, posted here because I think it's a bug.
>>> So, it will return null and I can get a nullpointerexception, as I was
>>> getting. Is this really the expected behavior? Never seen something
>>> returning null in other Scala tools that I used.
>>>
>>> Regards,
>>>
>>>
>>> 2015-09-14 18:54 GMT-03:00 Yin Huai :
>>>
 btw, move it to user list.

 On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:

> A scale of 10 means that there are 10 digits at the right of the
> decimal point. If you also have precision 10, the range of your data will
> be [0, 1) and casting "10.5" to DecimalType(10, 10) will return null, 
> which
> is expected.
>
> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi all,
>> I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
>> It seems that there was some changes in org.apache.spark.sql.types.
>> DecimalType
>>
>> This ugly code is a little sample to reproduce the error, don't use
>> it into your project.
>>
>> test("spark test") {
>>   val file = 
>> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
>> Row.fromSeq({
>> val values = f.split(",")
>> 
>> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
>> values.tail.tail.tail.head)}))
>>
>>   val structType = StructType(Seq(StructField("id", IntegerType, false),
>> StructField("int2", IntegerType, false), StructField("double",
>>
>>  DecimalType(10,10), false),
>>
>>
>> StructField("str2", StringType, false)))
>>
>>   val df = context.sqlContext.createDataFrame(file,structType)
>>   df.first
>> }
>>
>> The content of the file is:
>>
>> 1,5,10.5,va
>> 2,1,0.1,vb
>> 3,8,10.0,vc
>>
>> The problem resides in DecimalType, before 1.5 the scala wasn't
>> required. Now when using  DecimalType(12,10) it works fine, but
>> using DecimalType(10,10) the Decimal values
>> 10.5 became null, and the 0.1 works.
>>
>> Is there anybody working with DecimalType for 1.5.1?
>>
>> Regards,
>> Dirceu
>>
>>
>

>>>
>>>
>>
>


Re: how to get RDD from two different RDDs with cross column

2015-09-21 Thread Romi Kuntsman
Hi,
If I understand correctly:
rdd1 contains keys (of type StringDate)
rdd2 contains keys and values
and rdd3 contains all the keys, and the values from rdd2?

I think you should make rdd1 and rdd2 PairRDD, and then use outer join.
Does that make sense?

On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu  wrote:

> Dear Romi, Priya, Sujt and Shivaram and all,
>
> I have took lots of days to think into this issue, however, without  any
> enough good solution...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, and another RDD rdd2,
> (rdd2 can be PairRDD, or DataFrame with two columns as ).
> StringDate column values from rdd1 and rdd2 are cross but not the same.
>
> I would like to get a new RDD rdd3, StringDate in rdd3
> would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if
> its
> StringDate is in rdd2, or else NULL would be assigned.
> each row in rdd3[ i ] = ,
> rdd2[i].StringDate would be same as rdd1[ i ].StringDate,
> then rdd2[ i ].float is assigned rdd3[ i ] StringDate part.
> What kinds of API or function would I use...
>
> Thanks very much!
> Zhiliang
>
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
Cody, that's a great reference!
As shown there - the best way to connect to an external database from the
workers is to create a connection pool on (each) worker.
The driver mass pass, via broadcast, the connection string, but not the
connect object itself and not the spark context.

On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger  wrote:

> That isn't accurate, I think you're confused about foreach.
>
> Look at
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
>
> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:
>
>> foreach is something that runs on the driver, not the workers.
>>
>> if you want to perform some function on each record from cassandra, you
>> need to do cassandraRdd.map(func), which will run distributed on the spark
>> workers
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
>> wrote:
>>
>>> Yes, but i need to read from cassandra db within a spark
>>> transformation..something like..
>>>
>>> dstream.forachRDD{
>>>
>>> rdd=> rdd.foreach {
>>>  message =>
>>>  sc.cassandraTable()
>>>   .
>>>   .
>>>   .
>>> }
>>> }
>>>
>>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>>> available on workers ???
>>>
>>> Regards,
>>> Padma Ch
>>>
>>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>>>
 You can use broadcast variable for passing connection information.

 Cheers

 On Sep 21, 2015, at 4:27 AM, Priya Ch 
 wrote:

 can i use this sparkContext on executors ??
 In my application, i have scenario of reading from db for certain
 records in rdd. Hence I need sparkContext to read from DB (cassandra in our
 case),

 If sparkContext couldn't be sent to executors , what is the workaround
 for this ??

 On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
 wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an
>> object. Because passing sparkContext is giving me TaskNotSerializable
>> Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>

>>>
>>
>


Re: Exception initializing JavaSparkContext

2015-09-21 Thread Ellen Kraffmiller
I found the problem - the pom.xml I was using also contained and old
dependency to a mahout library, which was including the old hadoop-core.
Removing that fixed the problem.
Thank you!

On Mon, Sep 21, 2015 at 2:54 PM, Ted Yu  wrote:

> bq. hadoop-core-0.20.204.0
>
> How come the above got into play - it was from hadoop-1
>
> On Mon, Sep 21, 2015 at 11:34 AM, Ellen Kraffmiller <
> ellen.kraffmil...@gmail.com> wrote:
>
>> I am including the Spark core dependency in my maven pom.xml:
>>
>> 
>> org.apache.spark
>> spark-core_2.10
>> 1.5.0
>> 
>>
>> This is bringing these hadoop versions:
>> hadoop-annotations-2.2.0
>> hadoop-auth-2.2.0
>> hadoop-client-2.2.0
>> hadoop-common-2.2.0
>> hadoop-core-0.20.204.0
>> hadoop-hdfs-2.2.0
>> followed by mapreduce and yarn dependencies... let me know if you need
>> the full list.
>> Thanks,
>> Ellen
>>
>>
>> On Mon, Sep 21, 2015 at 1:48 PM, Marcelo Vanzin 
>> wrote:
>>
>>> What Spark package are you using? In particular, which hadoop version?
>>>
>>> On Mon, Sep 21, 2015 at 9:14 AM, ekraffmiller
>>>  wrote:
>>> > Hi,
>>> > I’m trying to run a simple test program to access Spark though Java.
>>> I’m
>>> > using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
>>> > JavaSparkContext constructor.  My initialization code matches all the
>>> sample
>>> > code I’ve found online, so not sure what I’m doing wrong.
>>> >
>>> > Here is my code:
>>> >
>>> > SparkConf conf = new SparkConf().setAppName("Simple Application");
>>> > conf.setMaster("local");
>>> > conf.setAppName("my app");
>>> > JavaSparkContext sc = new JavaSparkContext(conf);
>>> >
>>> > The stack trace of the Exception:
>>> >
>>> > java.lang.ExceptionInInitializerError: null
>>> > at java.lang.Class.getField(Class.java:1690)
>>> > at
>>> >
>>> org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
>>> > at
>>> >
>>> org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
>>> > at
>>> >
>>> org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
>>> > at
>>> org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
>>> > at
>>> org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
>>> > at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
>>> > at
>>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>>> > at
>>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
>>> > at org.apache.spark.SparkContext.(SparkContext.scala:441)
>>> > at
>>> >
>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>>> > at
>>> >
>>> edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)
>>> >
>>> > Thanks,
>>> > Ellen
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>


Re: What's the best practice to parse JSON using spark

2015-09-21 Thread Adrian Tanase
I've been using spray-json for general 
JSON ser/deser in scala (spark app), mostly for config files and data exchange. 
Haven't used it in conjunction with jobs that process large JSON data sources, 
so can't speak for those use cases.


-adrian



From: Petr Novak 
Sent: Monday, September 21, 2015 12:11 PM
To: Cui Lin; user
Subject: Re: What's the best practice to parse JSON using spark

Surprisingly I had the same issue when including json4s dependency at the same 
version v3.2.10. I had to remove json4s deps from my code. I'm using Scala 
2.11, there might be some issue with mixing 2.10/2.11 and it could be just my 
environment. I haven't investigated much as depending on Spark provided version 
is fine for us for now.

Regards,
Petr

On Mon, Sep 21, 2015 at 11:06 AM, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:
Internally Spark is using json4s and jackson parser v3.2.10, AFAIK. So if you 
are using Scala they should be available without adding dependencies. There is 
v3.2.11 already available but adding to my app was causing NoSuchMethod 
exception so I would have to shade it. I'm simply staying on v3.2.10 for now.

Regards,
Petr

On Sat, Sep 19, 2015 at 2:45 AM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
For #1, see this thread: http://search-hadoop.com/m/q3RTti0Thneenne2

For #2, also see:
examples//src/main/python/hbase_inputformat.py
examples//src/main/python/hbase_outputformat.py

Cheers

On Fri, Sep 18, 2015 at 5:12 PM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
For #2, please see:

examples/src/main/scala//org/apache/spark/examples/HBaseTest.scala
examples/src/main/scala//org/apache/spark/examples/pythonconverters/HBaseConverters.scala

In hbase, there is hbase-spark module which is being polished. Should be 
available in hbase 1.3.0 release.

Cheers

On Fri, Sep 18, 2015 at 5:09 PM, Cui Lin 
mailto:icecreamlc...@gmail.com>> wrote:
Hello,All,

Parsing JSON's nested structure is easy if using Java or Python API. Where I 
can find the similar way to parse JSON file using spark?

Another question is by using SparkSQL, how can i easily save the results into 
NOSQL DB? any examples? Thanks a lot!



--
Best regards!

Lin,Cui






Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Adrian Tanase
We do - using Spark streaming, Kafka, HDFS all collocated on the same nodes. 
Works great so far.


Spark picks up the location information and reads data from the partitions 
hosted by the local broker, showing up as NODE_LOCAL in the UI.

You also need to look at the locality options in the config 
(spark.locality.waitand friends) - just to make sure you're not wasting time if 
the kafka cluster becomes unbalanced and there are fewer cores than partitions 
on a particular node - you want to get to RACK_LOCAL as quickly as possible, 
we've set this to 500 milis instead of the default of 3 seconds.

-adrian


From: Cody Koeninger 
Sent: Monday, September 21, 2015 10:19 PM
To: Ashish Soni
Cc: user
Subject: Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

The direct stream already uses the kafka leader for a given partition as the 
preferred location.

I don't run kafka on the same nodes as spark, and I don't know anyone who does, 
so that situation isn't particularly well tested.

On Mon, Sep 21, 2015 at 1:15 PM, Ashish Soni 
mailto:asoni.le...@gmail.com>> wrote:
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka brokers 
and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the node 
i.e. the broker or partition is on the same machine.

Thanks,
Ashish



Re: Using Spark for portfolio manager app

2015-09-21 Thread Adrian Tanase
  1.  reading from kafka has exactly once guarantees - we are using it in 
production today (with the direct receiver)
 *   ​you will probably have 2 topics, loading both into spark and joining 
/ unioning as needed is not an issue
 *   tons of optimizations you can do there, assuming everything else works
  2.  ​for ad-hoc query I would say you absolutely need to look at external 
storage
 *   ​querying the Dstream or spark's RDD's directly should be done mostly 
for aggregates/metrics, not by users
 *   if you look at HBase or Cassandra for storage then 50k writes /sec are 
not a problem at all, especially combined with a smart client that does batch 
puts (like async hbase)
 *   you could also consider writing the updates to another kafka topic and 
have  a different component that updates the DB, if you think of other 
optimisations there
  3.  ​by stats I assume you mean metrics (operational or business)
 *   ​there are multiple ways to do this, however I would not encourage you 
to query spark directly, especially if you need an archive/history of your 
datapoints
 *   we are using OpenTSDB (we already have a HBase cluster) + Grafana for 
dashboarding
 *   collecting the metrics is a bit hairy in a streaming app - we have 
experimented with both accumulators and RDDs specific for metrics - chose the 
RDDs that write to OpenTSDB using foreachRdd

​-adrian


From: Thúy Hằng Lê 
Sent: Sunday, September 20, 2015 7:26 AM
To: Jörn Franke
Cc: user@spark.apache.org
Subject: Re: Using Spark for portfolio manager app

Thanks Adrian and Jorn for the answers.

Yes, you're right there are lot of things I need to consider if I want to use 
Spark for my app.

I still have few concerns/questions from your information:

1/ I need to combine trading stream with tick stream, I am planning to use 
Kafka for that
If I am using approach #2 (Direct Approach) in this tutorial 
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
[https://spark.apache.org/docs/latest/img/spark-logo-hd.png]

Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.
Read 
more...


Will I receive exactly one semantics? Or I have to add some logic in my code to 
archive that.
As your suggestion of using delta update, exactly one semantic is required for 
this application.

2/ For ad-hoc query, I must output of Spark to external storage and query on 
that right?
Is there any way to do ah-hoc query on Spark? my application could have 50k 
updates per second at pick time.
Persistent to external storage lead to high latency in my app.

3/ How to get real-time statistics from Spark,
In  most of the Spark streaming examples, the statistics are echo to the stdout.
However, I want to display those statics on GUI, is there any way to retrieve 
data from Spark directly without using external Storage?


2015-09-19 16:23 GMT+07:00 Jörn Franke 
mailto:jornfra...@gmail.com>>:

If you want to be able to let your users query their portfolio then you may 
want to think about storing the current state of the portfolios in 
hbase/phoenix or alternatively a cluster of relationaldatabases can make sense. 
For the rest you may use Spark.

Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê 
mailto:thuyhang...@gmail.com>> a écrit :
Hi all,

I am going to build a financial application for Portfolio Manager, where each 
portfolio contains a list of stocks, the number of shares purchased, and the 
purchase price.
Another source of information is stocks price from market data. The application 
need to calculate real-time gain or lost of each stock in each portfolio ( 
compared to the purchase price).

I am new with Spark, i know using Spark Streaming I can aggregate portfolio 
possitions in real-time, for example:
user A contains:
  - 100 IBM stock with transactionValue=$15000
  - 500 AAPL stock with transactionValue=$11400

Now given the stock prices change in real-time too, e.g if IBM price at 151, i 
want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = $100

My questions are:

 * What is the best method to combine 2 real-time streams( transaction 
made by user and market pricing data) in Spark.
 * How can I use real-time Adhoc SQL again portfolio's positions, is 
there any way i can do SQL on the output of Spark Streamming.
 For example,
  select sum(gainOrLost) from portfolio where user='A';
 * What are prefered external storages for Spark in this use case.
 * Is spark is right choice for my use case?




Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Cody Koeninger
The direct stream already uses the kafka leader for a given partition as
the preferred location.

I don't run kafka on the same nodes as spark, and I don't know anyone who
does, so that situation isn't particularly well tested.

On Mon, Sep 21, 2015 at 1:15 PM, Ashish Soni  wrote:

> Hi All ,
>
> Just wanted to find out if there is an benefits to installing  kafka
> brokers and spark nodes on the same machine ?
>
> is it possible that spark can pull data from kafka if it is local to the
> node i.e. the broker or partition is on the same machine.
>
> Thanks,
> Ashish
>


Re: Exception initializing JavaSparkContext

2015-09-21 Thread Ted Yu
bq. hadoop-core-0.20.204.0

How come the above got into play - it was from hadoop-1

On Mon, Sep 21, 2015 at 11:34 AM, Ellen Kraffmiller <
ellen.kraffmil...@gmail.com> wrote:

> I am including the Spark core dependency in my maven pom.xml:
>
> 
> org.apache.spark
> spark-core_2.10
> 1.5.0
> 
>
> This is bringing these hadoop versions:
> hadoop-annotations-2.2.0
> hadoop-auth-2.2.0
> hadoop-client-2.2.0
> hadoop-common-2.2.0
> hadoop-core-0.20.204.0
> hadoop-hdfs-2.2.0
> followed by mapreduce and yarn dependencies... let me know if you need the
> full list.
> Thanks,
> Ellen
>
>
> On Mon, Sep 21, 2015 at 1:48 PM, Marcelo Vanzin 
> wrote:
>
>> What Spark package are you using? In particular, which hadoop version?
>>
>> On Mon, Sep 21, 2015 at 9:14 AM, ekraffmiller
>>  wrote:
>> > Hi,
>> > I’m trying to run a simple test program to access Spark though Java.
>> I’m
>> > using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
>> > JavaSparkContext constructor.  My initialization code matches all the
>> sample
>> > code I’ve found online, so not sure what I’m doing wrong.
>> >
>> > Here is my code:
>> >
>> > SparkConf conf = new SparkConf().setAppName("Simple Application");
>> > conf.setMaster("local");
>> > conf.setAppName("my app");
>> > JavaSparkContext sc = new JavaSparkContext(conf);
>> >
>> > The stack trace of the Exception:
>> >
>> > java.lang.ExceptionInInitializerError: null
>> > at java.lang.Class.getField(Class.java:1690)
>> > at
>> >
>> org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
>> > at
>> >
>> org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
>> > at
>> >
>> org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
>> > at
>> >
>> org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
>> > at
>> >
>> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
>> > at
>> >
>> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
>> > at
>> >
>> org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
>> > at
>> >
>> org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
>> > at
>> org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
>> > at
>> org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
>> > at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
>> > at
>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>> > at
>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
>> > at org.apache.spark.SparkContext.(SparkContext.scala:441)
>> > at
>> >
>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>> > at
>> >
>> edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)
>> >
>> > Thanks,
>> > Ellen
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>
>


Re: Deploying spark-streaming application on production

2015-09-21 Thread Adrian Tanase
I'm wondering, isn't this the canonical use case for WAL + reliable receiver?

As far as I know you can tune Mqtt server to wait for ack on messages (qos 
level 2?).
With some support from the client libray you could achieve exactly once 
semantics on the read side, if you ack message only after writing it to WAL, 
correct?

-adrian

Sent from my iPhone

On 21 Sep 2015, at 12:35, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:

In short there is no direct support for it in Spark AFAIK. You will either 
manage it in MQTT or have to add another layer of indirection - either 
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs 
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know if 
it could be exploited aka I don't know if it is possible to decouple event 
reading into memory and actual processing code in Spark which could be swapped 
on the fly. Probably not without some custom built facility for it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:
I should read my posts at least once to avoid so many typos. Hopefully you are 
brave enough to read through.

Petr

On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:
I think you would have to persist events somehow if you don't want to miss 
them. I don't see any other option there. Either in MQTT if it is supported 
there or routing them through Kafka.

There is WriteAheadLog in Spark but you would have decouple stream MQTT reading 
and processing into 2 separate job so that you could upgrade the processing one 
assuming the reading one would be stable (without changes) across versions. But 
it is problematic because there is no easy way how to share DStreams between 
jobs - you would have develop your own facility for it.

Alternatively the reading job could could save MQTT event in its the most raw 
form into files - to limit need to change code - and then the processing job 
would work on top of it using Spark streaming based on files. I this is 
inefficient and can get quite complex if you would like to make it reliable.

Basically either MQTT supports prsistence (which I don't know) or there is 
Kafka for these use case.

Another option would be I think to place observable streams in between MQTT and 
Spark streaming with bakcpressure as far as you could perform upgrade till 
buffers fills up.

I'm sorry that it is not thought out well from my side, it is just a brainstorm 
but it might lead you somewhere.

Regards,
Petr

On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele 
mailto:gangele...@gmail.com>> wrote:
Hi All,

I have an spark streaming application with batch (10 ms) which is reading the 
MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark 
streaming application) what is the best way to deploy, currently I am doing as 
below

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in MQTT 
we will miss the events for the period of deploy.

how to handle this case?

regards
jeeetndra





Serialization Error with PartialFunction / immutable sets

2015-09-21 Thread Chaney Courtney
Hi, I’m receiving a task not serializable exception using Spark GraphX (Scala 
2.11.6 / JDK 1.8 / Spark 1.5)

My vertex data is of type (VertexId, immutable Set), 
My edge data is of type PartialFunction[ISet[E], ISet[E]] where each ED has a 
precomputed function.

My vertex program: 
val vertexProgram = (id: VertexId, currentSet: ISet[E], inSet: ISet[E]) 
=> inSet (identity)
My send message:
val sendMessage: (EdgeTriplet[ISet[E], MonotonicTransferFunction]) => 
Iterator[(VertexId, ISet[E])] =
(edge) => {
val f = edge.attr
val currentSet = edge.srcAttr
Iterator((edge.dstId, f(currentSet)))
}
My message combiner
val messageCombiner: (ISet[E], ISet[E]) => ISet[E] = 
(a, b) => a ++ b

g.pregel(bottom, Int.MaxValue, EdgeDirection.Out)(vp, send, combine)

I debugged the pregel execution and found that the exception happened when 
pregel calls mapReduceTriplets to aggregate the messages for the first time. 
This happens after the initial vertex program is run I believe (which does not 
cause an exception). I think the error lies within my send/combiner functions 
but I am not sure. I’ve also tried storing the PartialFunctions inside of the 
VD instead and still get the same error. At first I thought the error might 
have to do with Set and how it changes size throughout execution, but I have 
successfully ran other Pregel projects using immutable sets without issue…

I have also tried enclosing each method within its own class that extends 
Serializable but this still gives me the same error.

Thank you for your time and information.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception initializing JavaSparkContext

2015-09-21 Thread Ellen Kraffmiller
I am including the Spark core dependency in my maven pom.xml:


org.apache.spark
spark-core_2.10
1.5.0


This is bringing these hadoop versions:
hadoop-annotations-2.2.0
hadoop-auth-2.2.0
hadoop-client-2.2.0
hadoop-common-2.2.0
hadoop-core-0.20.204.0
hadoop-hdfs-2.2.0
followed by mapreduce and yarn dependencies... let me know if you need the
full list.
Thanks,
Ellen


On Mon, Sep 21, 2015 at 1:48 PM, Marcelo Vanzin  wrote:

> What Spark package are you using? In particular, which hadoop version?
>
> On Mon, Sep 21, 2015 at 9:14 AM, ekraffmiller
>  wrote:
> > Hi,
> > I’m trying to run a simple test program to access Spark though Java.  I’m
> > using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
> > JavaSparkContext constructor.  My initialization code matches all the
> sample
> > code I’ve found online, so not sure what I’m doing wrong.
> >
> > Here is my code:
> >
> > SparkConf conf = new SparkConf().setAppName("Simple Application");
> > conf.setMaster("local");
> > conf.setAppName("my app");
> > JavaSparkContext sc = new JavaSparkContext(conf);
> >
> > The stack trace of the Exception:
> >
> > java.lang.ExceptionInInitializerError: null
> > at java.lang.Class.getField(Class.java:1690)
> > at
> >
> org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
> > at
> >
> org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
> > at
> >
> org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
> > at
> >
> org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
> > at
> >
> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
> > at
> >
> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
> > at
> >
> org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
> > at
> >
> org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
> > at
> org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
> > at
> org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
> > at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
> > at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> > at
> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
> > at org.apache.spark.SparkContext.(SparkContext.scala:441)
> > at
> >
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> > at
> >
> edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)
> >
> > Thanks,
> > Ellen
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
> --
> Marcelo
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-21 Thread Alan Braithwaite
That could be the behavior but spark.mesos.executor.home being unset still
raises an exception inside the dispatcher preventing a docker from even
being started.  I can see if other properties are inherited from the
default environment when that's set, if you'd like.

I think the main problem is just that premature validation is being done on
the dispatcher and the dispatcher crashing in the event of bad config.

- Alan

On Sat, Sep 19, 2015 at 11:03 AM, Timothy Chen  wrote:

> You can still provide properties through the docker container by putting
> configuration in the conf directory, but we try to pass all properties
> submitted from the driver spark-submit through which I believe will
> override the defaults.
>
> This is not what you are seeing?
>
> Tim
>
>
> On Sep 19, 2015, at 9:01 AM, Alan Braithwaite  wrote:
>
> The assumption that the executor has no default properties set in it's
> environment through the docker container.  Correct me if I'm wrong, but any
> properties which are unset in the SparkContext will come from the
> environment of the executor will it not?
>
> Thanks,
> - Alan
>
> On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen  wrote:
>
>> I guess I need a bit more clarification, what kind of assumptions was the
>> dispatcher making?
>>
>> Tim
>>
>>
>> On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite 
>> wrote:
>>
>>> Hi Tim,
>>>
>>> Thanks for the follow up.  It's not so much that I expect the executor
>>> to inherit the configuration of the dispatcher as I* don't *expect the
>>> dispatcher to make assumptions about the system environment of the executor
>>> (since it lives in a docker).  I could potentially see a case where you
>>> might want to explicitly forbid the defaults, but I can't think of any
>>> right now.
>>>
>>> Otherwise, I'm confused as to why the defaults in the docker image for
>>> the executor are just ignored.  I suppose that it's the dispatchers job to
>>> ensure the *exact* configuration of the executor, regardless of the
>>> defaults set on the executors machine?  Is that the assumption being made?
>>> I can understand that in contexts which aren't docker driven since jobs
>>> could be rolling out in the middle of a config update.  Trying to think of
>>> this outside the terms of just mesos/docker (since I'm fully aware that
>>> docker doesn't rule the world yet).
>>>
>>> So I can see this from both perspectives now and passing in the
>>> properties file will probably work just fine for me, but for my better
>>> understanding: When the executor starts, will it read any of the
>>> environment that it's executing in or will it just take only the properties
>>> given to it by the dispatcher and nothing more?
>>>
>>> Lemme know if anything needs more clarification and thanks for your
>>> mesos contribution to spark!
>>>
>>> - Alan
>>>
>>> On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen  wrote:
>>>
 Hi Alan,

 If I understand correctly, you are setting executor home when you
 launch the dispatcher and not on the configuration when you submit job, and
 expect it to inherit that configuration?

 When I worked on the dispatcher I was assuming all configuration is
 passed to the dispatcher to launch the job exactly how you will need to
 launch it with client mode.

 But indeed it shouldn't crash dispatcher, I'll take a closer look when
 I get a chance.

 Can you recommend changes on the documentation, either in email or a PR?

 Thanks!

 Tim

 Sent from my iPhone

 On Sep 17, 2015, at 12:29 PM, Alan Braithwaite 
 wrote:

 Hey All,

 To bump this thread once again, I'm having some trouble using the
 dispatcher as well.

 I'm using Mesos Cluster Manager with Docker Executors.  I've deployed
 the dispatcher as Marathon job.  When I submit a job using spark submit,
 the dispatcher writes back that the submission was successful and then
 promptly dies in marathon.  Looking at the logs reveals it was hitting the
 following line:

 398:  throw new SparkException("Executor Spark home
 `spark.mesos.executor.home` is not set!")

 Which is odd because it's set in multiple places (SPARK_HOME,
 spark.mesos.executor.home, spark.home, etc).  Reading the code, it
 appears that the driver desc pulls only from the request and disregards any
 other properties that may be configured.  Testing by passing --conf
 spark.mesos.executor.home=/usr/local/spark on the command line to
 spark-submit confirms this.  We're trying to isolate the number of places
 where we have to set properties within spark and were hoping that it will
 be possible to have this pull in the spark-defaults.conf from somewhere, or
 at least allow the user to inform the dispatcher through spark-submit that
 those properties will be available once the job starts.

 Finally, I don't think the dispatcher should crash in this even

Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Ashish Soni
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka
brokers and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the
node i.e. the broker or partition is on the same machine.

Thanks,
Ashish


JDBCRdd issue

2015-09-21 Thread Saurabh Malviya (samalviy)
Hi,


While using reference with in JDBCRdd , It is throwing serializable exception. 
Does JDBCRdd does not except reference from other part of code.?
 confMap= ConfFactory.getConf(ParquetStreaming)

  val jdbcRDD = new JdbcRDD(sc, () => {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
DriverManager.getConnection(confMap(PHOENIX_URL)) -Throwing 
below exception

//DriverManager.getConnection(ConfFactory.getConf(ParquetStreaming)(PHOENIX_URL))
 ---This works
  }, s"SELECT tenant_id, data_source_id, mne_id, device_type1_key " 
+
 s" FROM XYZ_TYPE1_TEST WHERE DEVICE_TYPE1_KEY >= ? and 
DEVICE_TYPE1_KEY <= ? and TENANT_ID in ($tenantIds) " +
 s" AND DATA_SOURCE_ID in ($dataSourceIds) AND ISDELETED = 
false",
minKey, maxKey, 10, row => DeviceDel(row.getString(1), 
row.getString(2), row.getLong(3), row.getLong(4))).cache()

It throws runtime exception. However, " 
DriverManager.getConnection("jdbc:phoenix:10.20.87.1:2181") "   works fine.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task not serializable: java.io.NotSerializableException: 
org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: 
org.apache.spark.SparkContext, value: 
org.apache.spark.SparkContext@5bb273b4)
- field (class: 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1, name: sc$1, 
type: class org.apache.spark.SparkContext)
- object (class 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1, )
- field (class: 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1$$anonfun$6, 
name: $outer, type: class $$anonfun$main$1)
- object (class 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1$$anonfun$6, 
)
- field (class: org.apache.spark.rdd.JdbcRDD, name: 
org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0)
- object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[15] at 
JdbcRDD at DeviceDelETL.scala:91)
- field (class: scala.Tuple2, name: _1, type: class 
java.lang.Object)
- object (class scala.Tuple2, (JdbcRDD[15] at JdbcRDD at 
DeviceDelETL.scala:91,))
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:878)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Sep 18, 2015 12:22:59 PM INFO: parquet.hadoop.ParquetFileReader: Initiating 
action with parallelism: 5

Any idea?



Re: DataGenerator for streaming application

2015-09-21 Thread Saiph Kappa
Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream",
not sure if it was documented in the spark programming guide.

On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawat 
wrote:

> Why are you using  rawSocketStream to read the data? I believe
> rawSocketStream waits for a big chunk of data before it can start
> processing it. I think what you are writing is a String and you should use
> socketTextStream which reads the data on a per line basis.
>
> On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> I am trying to build a data generator that feeds a streaming application.
>> This data generator just reads a file and send its lines through a socket.
>> I get no errors on the logs, and the benchmark bellow always prints
>> "Received 0 records". Am I doing something wrong?
>>
>>
>> object MyDataGenerator {
>>
>>   def main(args: Array[String]) {
>> if (args.length != 3) {
>>   System.err.println("Usage: RawTextSender   ")
>>   System.exit(1)
>> }
>> // Parse the arguments using a pattern match
>> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>>
>> val serverSocket = new ServerSocket(port)
>> println("Listening on port " + port)
>>
>>
>> while (true) {
>>   val socket = serverSocket.accept()
>>   println("Got a new connection")
>>
>>
>>   val out = new PrintWriter(socket.getOutputStream)
>>   try {
>> var count = 0
>> var startTimestamp = -1
>> for (line <- Source.fromFile(file).getLines()) {
>>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>>   if(startTimestamp < 0)
>> startTimestamp = ts
>>
>>   if(ts - startTimestamp <= 30) {
>> out.println(line)
>> count += 1
>>   } else {
>> println(s"Emmited reports: $count")
>> count = 0
>> out.flush()
>> startTimestamp = ts
>> Thread.sleep(sleepMillis)
>>   }
>> }
>>   } catch {
>> case e: IOException =>
>>   println("Client disconnected")
>>   socket.close()
>>   }
>> }
>> }
>> }
>>
>>
>>
>> object Benchmark {
>>   def main(args: Array[String]) {
>> if (args.length != 4) {
>>   System.err.println("Usage: RawNetworkGrep
>> ")
>>   System.exit(1)
>> }
>>
>> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
>> args(2).toInt, args(3).toInt)
>> val sparkConf = new SparkConf()
>> sparkConf.setAppName("BenchMark")
>> 
>> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
>> sparkConf.set("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>> sparkConf.set("spark.executor.extraJavaOptions", " 
>> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts 
>> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
>> if (sparkConf.getOption("spark.master") == None) {
>>   // Master not set, as this was not launched through Spark-submit. 
>> Setting master as local."
>>   sparkConf.setMaster("local[*]")
>> }
>>
>> // Create the context
>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>
>> val rawStreams = (1 to numStreams).map(_ =>
>>   ssc.rawSocketStream[String](host, port, 
>> StorageLevel.MEMORY_ONLY_SER)).toArray
>> val union = ssc.union(rawStreams)
>> union.count().map(c => s"Received $c records").print()
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>> }
>>
>> Thanks.
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
Looks like the problem is df.rdd does not work very well with limit. In
scala, df.limit(1).rdd will also trigger the issue you observed. I will add
this in the jira.

On Mon, Sep 21, 2015 at 10:44 AM, Jerry Lam  wrote:

> I just noticed you found 1.4 has the same issue. I added that as well in
> the ticket.
>
> On Mon, Sep 21, 2015 at 1:43 PM, Jerry Lam  wrote:
>
>> Hi Yin,
>>
>> You are right! I just tried the scala version with the above lines, it
>> works as expected.
>> I'm not sure if it happens also in 1.4 for pyspark but I thought the
>> pyspark code just calls the scala code via py4j. I didn't expect that this
>> bug is pyspark specific. That surprises me actually a bit. I created a
>> ticket for this (SPARK-10731
>> ).
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai  wrote:
>>
>>> btw, does 1.4 has the same problem?
>>>
>>> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>>>
 Hi Jerry,

 Looks like it is a Python-specific issue. Can you create a JIRA?

 Thanks,

 Yin

 On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam 
 wrote:

> Hi Spark Developers,
>
> I just ran some very simple operations on a dataset. I was surprise by
> the execution plan of take(1), head() or first().
>
> For your reference, this is what I did in pyspark 1.5:
> df=sqlContext.read.parquet("someparquetfiles")
> df.head()
>
> The above lines take over 15 minutes. I was frustrated because I can
> do better without using spark :) Since I like spark, so I tried to figure
> out why. It seems the dataframe requires 3 stages to give me the first 
> row.
> It reads all data (which is about 1 billion rows) and run Limit twice.
>
> Instead of head(), show(1) runs much faster. Not to mention that if I
> do:
>
> df.rdd.take(1) //runs much faster.
>
> Is this expected? Why head/first/take is so slow for dataframe? Is it
> a bug in the optimizer? or I did something wrong?
>
> Best Regards,
>
> Jerry
>


>>>
>>
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Romi,
I must show my sincere appreciation towards your kind & helpful help.
One more question, currently I am using spark to deal with financial data 
analysis, so lots of operations on R data.frame/matrix and stat/regressionare 
always called.However, SparkR currently is not that strong, most of its 
functions are from spark SQL and Mlib. Then, SQL and DataFrame is not as 
flexibly & easyas R operate on data.frame/matrix, moreover, now I do not decide 
how much function in Mlib would be used to R specific stat/regression .
I have also thought of only operating the data by way of spark Java, it is 
quite much hard to act as data.frame/matrix from R .I think I have lost in risk 
by those.
Would you help comment some on my points...
Thank you very much!Zhiliang


 


 On Tuesday, September 22, 2015 1:21 AM, Sujit Pal  
wrote:
   

 Hi Zhiliang,
Haven't used the Java API but found this Javadoc page, may be helpful to you.
https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

I think the equivalent Java code snippet might go something like this:
RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)
(the second parameter of fromRDD comes from this discussion 
thread).http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html

There is also the SlidingRDD 
decorator:https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html

So maybe something like this:
new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))
-sujit

On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu  wrote:

Hi Sujit,
I must appreciate your kind help very much~
It seems to be OK, however, do you know the corresponding spark Java API 
achievement...Is there any java API as scala sliding, and it seemed that I do 
not find spark scala's doc about sliding ...
Thank you very much~Zhiliang 


 On Monday, September 21, 2015 11:48 PM, Sujit Pal  
wrote:
   

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
-sujit

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  
wrote:

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




   



   



  

Re: Exception initializing JavaSparkContext

2015-09-21 Thread Marcelo Vanzin
What Spark package are you using? In particular, which hadoop version?

On Mon, Sep 21, 2015 at 9:14 AM, ekraffmiller
 wrote:
> Hi,
> I’m trying to run a simple test program to access Spark though Java.  I’m
> using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
> JavaSparkContext constructor.  My initialization code matches all the sample
> code I’ve found online, so not sure what I’m doing wrong.
>
> Here is my code:
>
> SparkConf conf = new SparkConf().setAppName("Simple Application");
> conf.setMaster("local");
> conf.setAppName("my app");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> The stack trace of the Exception:
>
> java.lang.ExceptionInInitializerError: null
> at java.lang.Class.getField(Class.java:1690)
> at
> org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
> at
> org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
> at
> org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
> at
> org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
> at
> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
> at
> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
> at
> org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
> at
> org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
> at org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
> at 
> org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at 
> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
> at org.apache.spark.SparkContext.(SparkContext.scala:441)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at
> edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)
>
> Thanks,
> Ellen
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Issue with high no of skipped task

2015-09-21 Thread Saurav Sinha
-- Forwarded message --
From: "Saurav Sinha" 
Date: 21-Sep-2015 11:48 am
Subject: Issue with high no of skipped task
To: 
Cc:


Hi Users,

I am new Spark I have written flow.When we deployed our code it is
completing jobs in 4-5 min. But now it is taking 20+ min in completing with
almost same set of data. Can you please help me to figure out reason for it.

-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
I just noticed you found 1.4 has the same issue. I added that as well in
the ticket.

On Mon, Sep 21, 2015 at 1:43 PM, Jerry Lam  wrote:

> Hi Yin,
>
> You are right! I just tried the scala version with the above lines, it
> works as expected.
> I'm not sure if it happens also in 1.4 for pyspark but I thought the
> pyspark code just calls the scala code via py4j. I didn't expect that this
> bug is pyspark specific. That surprises me actually a bit. I created a
> ticket for this (SPARK-10731
> ).
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai  wrote:
>
>> btw, does 1.4 has the same problem?
>>
>> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>>
>>> Hi Jerry,
>>>
>>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>>>
 Hi Spark Developers,

 I just ran some very simple operations on a dataset. I was surprise by
 the execution plan of take(1), head() or first().

 For your reference, this is what I did in pyspark 1.5:
 df=sqlContext.read.parquet("someparquetfiles")
 df.head()

 The above lines take over 15 minutes. I was frustrated because I can do
 better without using spark :) Since I like spark, so I tried to figure out
 why. It seems the dataframe requires 3 stages to give me the first row. It
 reads all data (which is about 1 billion rows) and run Limit twice.

 Instead of head(), show(1) runs much faster. Not to mention that if I
 do:

 df.rdd.take(1) //runs much faster.

 Is this expected? Why head/first/take is so slow for dataframe? Is it a
 bug in the optimizer? or I did something wrong?

 Best Regards,

 Jerry

>>>
>>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Yin,

You are right! I just tried the scala version with the above lines, it
works as expected.
I'm not sure if it happens also in 1.4 for pyspark but I thought the
pyspark code just calls the scala code via py4j. I didn't expect that this
bug is pyspark specific. That surprises me actually a bit. I created a
ticket for this (SPARK-10731
).

Best Regards,

Jerry


On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai  wrote:

> btw, does 1.4 has the same problem?
>
> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>
>> Hi Jerry,
>>
>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>>
>>> Hi Spark Developers,
>>>
>>> I just ran some very simple operations on a dataset. I was surprise by
>>> the execution plan of take(1), head() or first().
>>>
>>> For your reference, this is what I did in pyspark 1.5:
>>> df=sqlContext.read.parquet("someparquetfiles")
>>> df.head()
>>>
>>> The above lines take over 15 minutes. I was frustrated because I can do
>>> better without using spark :) Since I like spark, so I tried to figure out
>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>
>>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>>
>>> df.rdd.take(1) //runs much faster.
>>>
>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>> bug in the optimizer? or I did something wrong?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
Seems 1.4 has the same issue.

On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:

> btw, does 1.4 has the same problem?
>
> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>
>> Hi Jerry,
>>
>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>>
>>> Hi Spark Developers,
>>>
>>> I just ran some very simple operations on a dataset. I was surprise by
>>> the execution plan of take(1), head() or first().
>>>
>>> For your reference, this is what I did in pyspark 1.5:
>>> df=sqlContext.read.parquet("someparquetfiles")
>>> df.head()
>>>
>>> The above lines take over 15 minutes. I was frustrated because I can do
>>> better without using spark :) Since I like spark, so I tried to figure out
>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>
>>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>>
>>> df.rdd.take(1) //runs much faster.
>>>
>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>> bug in the optimizer? or I did something wrong?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


how to get RDD from two different RDDs with cross column

2015-09-21 Thread Zhiliang Zhu
Dear Romi, Priya, Sujt and Shivaram and all,
I have took lots of days to think into this issue, however, without  any enough 
good solution...I shall appreciate your all kind help.
There is an RDD rdd1, and another RDD rdd2, 
(rdd2 can be PairRDD, or DataFrame with two columns as ).StringDate column values from rdd1 and rdd2 are cross but not the same.

I would like to get a new RDD rdd3, StringDate in rdd3 would 
be all from (same) as rdd1, and float in rdd3 would be from rdd2 if its 
StringDate is in rdd2, or else NULL would be assigned.
each row in rdd3[ i ] = , 
rdd2[i].StringDate would be same as rdd1[ i ].StringDate, 
then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. What kinds of API 
or function would I use...
Thanks very much!Zhiliang



Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Haven't used the Java API but found this Javadoc page, may be helpful to
you.

https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

I think the equivalent Java code snippet might go something like this:

RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)

(the second parameter of fromRDD comes from this discussion thread).
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html

There is also the SlidingRDD decorator:
https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html

So maybe something like this:

new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))

-sujit

On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu  wrote:

> Hi Sujit,
>
> I must appreciate your kind help very much~
>
> It seems to be OK, however, do you know the corresponding spark Java API
> achievement...
> Is there any java API as scala sliding, and it seemed that I do not find
> spark scala's doc about sliding ...
>
> Thank you very much~
> Zhiliang
>
>
>
> On Monday, September 21, 2015 11:48 PM, Sujit Pal 
> wrote:
>
>
> Hi Zhiliang,
>
> Would something like this work?
>
> val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
>
> -sujit
>
>
> On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  > wrote:
>
> Hi Romi,
>
> Thanks very much for your kind help comment~~
>
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
>
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
>
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
>
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
>
> I must show all my sincere thanks torwards your kind help.
>
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
>
> Best Regards,
> Zhiliang
>
>
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman 
> wrote:
>
>
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  > wrote:
>
> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>
>
>
>
>
>
>


Re: spark + parquet + schema name and metadata

2015-09-21 Thread Cheng Lian
Currently Spark SQL doesn't support customizing schema name and 
metadata. May I know why these two matters in your use case? Some 
Parquet data models, like parquet-avro, do support it, while some others 
don't (e.g. parquet-hive).


Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:

Hi,

I am trying to figure out how to write parquet metadata when 
persisting DataFrames to parquet using Spark (1.4.1)


I could not find a way to change schema name (which seems to be 
hardcoded to root) and also how to add data to key/value metadata in 
parquet footer.


org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData

org.apache.parquet.schema.Type#getName

thanks





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
btw, does 1.4 has the same problem?

On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:

> Hi Jerry,
>
> Looks like it is a Python-specific issue. Can you create a JIRA?
>
> Thanks,
>
> Yin
>
> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>
>> Hi Spark Developers,
>>
>> I just ran some very simple operations on a dataset. I was surprise by
>> the execution plan of take(1), head() or first().
>>
>> For your reference, this is what I did in pyspark 1.5:
>> df=sqlContext.read.parquet("someparquetfiles")
>> df.head()
>>
>> The above lines take over 15 minutes. I was frustrated because I can do
>> better without using spark :) Since I like spark, so I tried to figure out
>> why. It seems the dataframe requires 3 stages to give me the first row. It
>> reads all data (which is about 1 billion rows) and run Limit twice.
>>
>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>
>> df.rdd.take(1) //runs much faster.
>>
>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>> bug in the optimizer? or I did something wrong?
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
Hi Jerry,

Looks like it is a Python-specific issue. Can you create a JIRA?

Thanks,

Yin

On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:

> Hi Spark Developers,
>
> I just ran some very simple operations on a dataset. I was surprise by the
> execution plan of take(1), head() or first().
>
> For your reference, this is what I did in pyspark 1.5:
> df=sqlContext.read.parquet("someparquetfiles")
> df.head()
>
> The above lines take over 15 minutes. I was frustrated because I can do
> better without using spark :) Since I like spark, so I tried to figure out
> why. It seems the dataframe requires 3 stages to give me the first row. It
> reads all data (which is about 1 billion rows) and run Limit twice.
>
> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>
> df.rdd.take(1) //runs much faster.
>
> Is this expected? Why head/first/take is so slow for dataframe? Is it a
> bug in the optimizer? or I did something wrong?
>
> Best Regards,
>
> Jerry
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Sujit,
Thanks very much for your kind help.I have found the sliding doc in both scala 
and java spark, it is from mlib RDDFunctions, though in the doc there is always 
not enough example.
Best Regards,Zhiliang

 


 On Monday, September 21, 2015 11:48 PM, Sujit Pal  
wrote:
   

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
-sujit

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  
wrote:

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




   



  

Re: Docker/Mesos with Spark

2015-09-21 Thread Tim Chen
Hi John,

There is no other blog post yet, I'm thinking to do a series of posts but
so far haven't get time to do that yet.

Running Spark in docker containers makes distributing spark versions easy,
it's simple to upgrade and automatically caches on the slaves so the same
image just runs right away. Most of the docker perf is usually related to
network and filesystem overheads, but I think with recent changes in Spark
to make Mesos sandbox the default temp dir filesystem won't be a big
concern as it's mostly writing to the mounted in Mesos sandbox. Also Mesos
uses host network by default so network is affected much.

Most of the cluster mode limitation is that you need to make the spark job
files available somewhere that all the slaves can access remotely (http,
s3, hdfs, etc) or available on all slaves locally by path.

I'll try to make more doc efforts once I get my existing patches and
testing infra work done.

Let me know if you have more questions,

Tim

On Sat, Sep 19, 2015 at 5:42 AM, John Omernik  wrote:

> I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and
> just found you CAN run it this way.  Are there any user posts, blog posts,
> etc on why and how you'd do this?
>
> Basically, at first I was questioning why you'd run spark in a docker
> container, i.e., if you run with tar balled executor, what are you really
> gaining?  And in this setup, are you losing out on performance somehow? (I
> am guessing smarter people than I have figured that out).
>
> Then I came along a situation where I wanted to use a python library with
> spark, and it had to be installed on every node, and I realized one big
> advantage of dockerized spark would be that spark apps that needed other
> libraries could be contained and built well.
>
> OK, that's huge, let's do that.  For my next question there are lot of
> "questions" have on how this actually works.  Does Clustermode/client mode
> apply here? If so, how?  Is there a good walk through on getting this
> setup? Limitations? Gotchas?  Should I just dive in an start working with
> it? Has anyone done any stories/rough documentation? This seems like a
> really helpful feature to scaling out spark, and letting developers truly
> build what they need without tons of admin overhead, so I really want to
> explore.
>
> Thanks!
>
> John
>


Re: Python Packages in Spark w/Mesos

2015-09-21 Thread Tim Chen
Hi John,

Sorry haven't get time to respond to your questions over the weekend.

If you're running client mode, to use the Docker/Mesos integration
minimally you just need to set the image configuration
'spark.mesos.executor.docker.image' as stated in the documentation, which
Spark will use this image to run each Spark executor.

Therefore, if you want to include your python dependencies, you can also
pre-install them in that image and it should be able to find it if you set
the PYTHON env variables pointing to those. I'm not very familiar with
python, but I recently got Mesos cluster mode with python to work and it's
merged into master.

Tim

On Mon, Sep 21, 2015 at 8:34 AM, John Omernik  wrote:

> Hey all -
>
> Curious at the best way to include python packages in my Spark
> installation. (Such as NLTK). Basically I am running on Mesos, and would
> like to find a way to include the package in the binary distribution in
> that I don't want to install packages on all nodes.  We should be able to
> include in the distribution, right?.
>
> I thought of using the Docker Mesos integration, but I have been unable to
> find information on this (see my other question on Docker/Mesos/Spark).
> Any other thoughts on the best way to include packages in Spark WITHOUT
> installing on each node would be appreciated!
>
> John
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Sujit,
I must appreciate your kind help very much~
It seems to be OK, however, do you know the corresponding spark Java API 
achievement...Is there any java API as scala sliding, and it seemed that I do 
not find spark scala's doc about sliding ...
Thank you very much~Zhiliang 


 On Monday, September 21, 2015 11:48 PM, Sujit Pal  
wrote:
   

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
-sujit

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  
wrote:

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




   



  

Exception initializing JavaSparkContext

2015-09-21 Thread ekraffmiller
Hi,
I’m trying to run a simple test program to access Spark though Java.  I’m
using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
JavaSparkContext constructor.  My initialization code matches all the sample
code I’ve found online, so not sure what I’m doing wrong.

Here is my code:

SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.setMaster("local");
conf.setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);

The stack trace of the Exception:

java.lang.ExceptionInInitializerError: null
at java.lang.Class.getField(Class.java:1690)
at
org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
at
org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
at
org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
at
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
at
org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
at
org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
at
org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
at
org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
at org.apache.spark.SparkContext.(SparkContext.scala:441)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at
edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)

Thanks,
Ellen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Spark Developers,

I just ran some very simple operations on a dataset. I was surprise by the
execution plan of take(1), head() or first().

For your reference, this is what I did in pyspark 1.5:
df=sqlContext.read.parquet("someparquetfiles")
df.head()

The above lines take over 15 minutes. I was frustrated because I can do
better without using spark :) Since I like spark, so I tried to figure out
why. It seems the dataframe requires 3 stages to give me the first row. It
reads all data (which is about 1 billion rows) and run Limit twice.

Instead of head(), show(1) runs much faster. Not to mention that if I do:

df.rdd.take(1) //runs much faster.

Is this expected? Why head/first/take is so slow for dataframe? Is it a bug
in the optimizer? or I did something wrong?

Best Regards,

Jerry


Re: Why are executors on slave never used?

2015-09-21 Thread Hemant Bhanawat
When you specify master as local[2], it starts the spark components in a
single jvm. You need to specify the master correctly.
I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I run
a Spark process, it works fine -- but only on the master, as if it were
standalone.

The web-UI and logging code shows only 1 executor, the localhost.

How can I diagnose this?

(I create *SparkConf, *in Python, with *setMaster('local[2]'). )*

(Strangely, though I don't think that this causes the problem, there is
almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
 datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
cluster as created by* create-cluster*, so I would assume that the slave
and master are configured OK out-of the box.)

Joshua


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Would something like this work?

val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))

-sujit


On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu 
wrote:

> Hi Romi,
>
> Thanks very much for your kind help comment~~
>
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
>
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
>
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
>
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
>
> I must show all my sincere thanks torwards your kind help.
>
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
>
> Best Regards,
> Zhiliang
>
>
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman 
> wrote:
>
>
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  > wrote:
>
> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>
>
>
>


Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-21 Thread Ellen Kraffmiller
Thank you for the link! I was using
http://apache-spark-user-list.1001560.n3.nabble.com/, and I didn't see
replies there.

Regarding your code example, I'm doing the same thing and successfully
creating the rdd, but the problem is that when I call a clustering
algorithm like amap::hcluster(), I get an error from as.vector() that the
rdd cannot be coerced into a vector.

On Fri, Sep 18, 2015 at 12:33 PM, Luciano Resende 
wrote:

> I see the thread with all the responses on the bottom at mail-archive :
>
> https://www.mail-archive.com/user%40spark.apache.org/msg36882.html
>
> On Fri, Sep 18, 2015 at 7:58 AM, Ellen Kraffmiller <
> ellen.kraffmil...@gmail.com> wrote:
>
>> Thanks for your response.  Is there a reason why this thread isn't
>> appearing on the mailing list?  So far, I only see my post, with no
>> answers, although I have received 2 answers via email.  It would be nice if
>> other people could see these answers as well.
>>
>> On Thu, Sep 17, 2015 at 2:22 AM, Sun, Rui  wrote:
>>
>>> The existing algorithms operating on R data.frame can't simply operate
>>> on SparkR DataFrame. They have to be re-implemented to be based on SparkR
>>> DataFrame API.
>>>
>>> -Original Message-
>>> From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com]
>>> Sent: Thursday, September 17, 2015 3:30 AM
>>> To: user@spark.apache.org
>>> Subject: SparkR - calling as.vector() with rdd dataframe causes error
>>>
>>> Hi,
>>> I have a library of clustering algorithms that I'm trying to run in the
>>> SparkR interactive shell. (I am working on a proof of concept for a
>>> document classification tool.) Each algorithm takes a term document matrix
>>> in the form of a dataframe.  When I pass the method a local dataframe, the
>>> clustering algorithm works correctly, but when I pass it a spark rdd, it
>>> gives an error trying to coerce the data into a vector.  Here is the code,
>>> that I'm calling within SparkR:
>>>
>>> # get matrix from a file
>>> file <-
>>>
>>> "/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"
>>>
>>> #read it into variable
>>>  raw_data <- read.csv(file,sep=',',header=FALSE)
>>>
>>> #convert to a local dataframe
>>> localDF = data.frame(raw_data)
>>>
>>> # create the rdd
>>> rdd  <- createDataFrame(sqlContext,localDF)
>>>
>>> #call the algorithm with the localDF - this works result <-
>>> galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)
>>>
>>> #call with the rdd - this produces error result <- galileo(rdd,
>>> model='hclust',dist='euclidean',link='ward',K=5)
>>>
>>> Error in as.vector(data) :
>>>   no method for coercing this S4 class to a vector
>>>
>>>
>>> I get the same error if I try to directly call as.vector(rdd) as well.
>>>
>>> Is there a reason why this works for localDF and not rdd?  Should I be
>>> doing something else to coerce the object into a vector?
>>>
>>> Thanks,
>>> Ellen
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>> additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Luciano Resende
> http://people.apache.org/~lresende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: Count for select not matching count for group by

2015-09-21 Thread Richard Hillegas
For what it's worth, I get the expected result that "filter" behaves like
"group by" when I run the same experiment against a DataFrame which was
loaded from a relational store:

import org.apache.spark.sql._
import org.apache.spark.sql.types._

val df = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:derby:/Users/rhillegas/derby/databases/derby1",
  "dbtable" -> "app.outcomes")).load()

df.select("OUTCOME").groupBy("OUTCOME").count.show
#
# returns:
#
# +---+-+
# |OUTCOME|count|
# +---+-+
# |  A|  128|
# |  B|  256|
# +---+-+

df.filter("OUTCOME = 'A'").count
#
# returns:
#
# res1: Long = 128


df.registerTempTable("test_data")
sqlContext.sql("select OUTCOME, count( OUTCOME ) from test_data group by
OUTCOME").show
#
# returns:
#
# +---+---+
# |OUTCOME|_c1|
# +---+---+
# |  A|128|
# |  B|256|
# +---+---+

Thanks,
-Rick

Michael Kelly  wrote on 09/21/2015 08:06:29
AM:

> From: Michael Kelly 
> To: user@spark.apache.org
> Date: 09/21/2015 08:08 AM
> Subject: Count for select not matching count for group by
>
> Hi,
>
> I'm seeing some strange behaviour with spark 1.5, I have a dataframe
> that I have built from loading and joining some hive tables stored in
> s3.
>
> The dataframe is cached in memory, using df.cache.
>
> What I'm seeing is that the counts I get when I do a group by on a
> column are different from what I get when I filter/select and count.
>
> df.select("outcome").groupBy("outcome").count.show
> outcome | count
> --
> 'A'   |  100
> 'B'   |  200
>
> df.filter("outcome = 'A'").count
> # 50
>
> df.filter(df("outcome") === "A").count
> # 50
>
> I expect the count of columns that match 'A' in the groupBy to match
> the count when filtering. Any ideas what might be happening?
>
> Thanks,
>
> Michael
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Python Packages in Spark w/Mesos

2015-09-21 Thread John Omernik
Hey all -

Curious at the best way to include python packages in my Spark
installation. (Such as NLTK). Basically I am running on Mesos, and would
like to find a way to include the package in the binary distribution in
that I don't want to install packages on all nodes.  We should be able to
include in the distribution, right?.

I thought of using the Docker Mesos integration, but I have been unable to
find information on this (see my other question on Docker/Mesos/Spark).
Any other thoughts on the best way to include packages in Spark WITHOUT
installing on each node would be appreciated!

John


AWS_CREDENTIAL_FILE

2015-09-21 Thread Michel Lemay
Hi,

It looks like spark does read AWS credentials from environment variable
AWS_CREDENTIAL_FILE like awscli does.


Mike


sqlContext.read.avro broadcasting files from the driver

2015-09-21 Thread Daniel Haviv
Hi,
I'm loading a 1000 files using the spark-avro package:
val df = sqlContext.read.avro(*"/incoming/"*)

When I'm performing an action on this df it seems like for each file a
broadcast is being created and is sent to the workers (instead of the
workers reading their data-local files):

scala> df.coalesce(4).count
15/09/21 15:11:32 INFO storage.MemoryStore: ensureFreeSpace(261920) called
with curMem=0, maxMem=2223023063
15/09/21 15:11:32 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 255.8 KB, free 2.1 GB)
15/09/21 15:11:32 INFO storage.MemoryStore: ensureFreeSpace(22987) called
with curMem=261920, maxMem=2223023063
15/09/21 15:11:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 22.4 KB, free 2.1 GB)
15/09/21 15:11:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on 192.168.3.4:39736 (size: 22.4 KB, free: 2.1 GB)



15/09/21 15:12:45 INFO storage.MemoryStore: ensureFreeSpace(22987) called
with curMem=294913622, maxMem=2223023063
15/09/21 15:12:45 INFO storage.MemoryStore: Block
*broadcast_1034_piece0 *stored
as bytes in memory (estimated size 22.4 KB, free 1838.8 MB)
15/09/21 15:12:45 INFO storage.BlockManagerInfo: Added
broadcast_1034_piece0 in memory on 192.168.3.4:39736 (size: 22.4 KB, free:
2.0 GB)
15/09/21 15:12:45 INFO spark.SparkContext: Created broadcast 1034 from
hadoopFile at AvroRelation.scala:121
15/09/21 15:12:46 INFO execution.Exchange: Using SparkSqlSerializer2.
15/09/21 15:12:46 INFO spark.SparkContext: Starting job: count at
:25

Am I understanding this wrongs?

Thank you.
Daniel


Count for select not matching count for group by

2015-09-21 Thread Michael Kelly
Hi,

I'm seeing some strange behaviour with spark 1.5, I have a dataframe
that I have built from loading and joining some hive tables stored in
s3.

The dataframe is cached in memory, using df.cache.

What I'm seeing is that the counts I get when I do a group by on a
column are different from what I get when I filter/select and count.

df.select("outcome").groupBy("outcome").count.show
outcome | count
--
'A'   |  100
'B'   |  200

df.filter("outcome = 'A'").count
# 50

df.filter(df("outcome") === "A").count
# 50

I expect the count of columns that match 'A' in the groupBy to match
the count when filtering. Any ideas what might be happening?

Thanks,

Michael

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




  

Why are executors on slave never used?

2015-09-21 Thread Joshua Fox
I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I run
a Spark process, it works fine -- but only on the master, as if it were
standalone.

The web-UI and logging code shows only 1 executor, the localhost.

How can I diagnose this?

(I create *SparkConf, *in Python, with *setMaster('local[2]'). )*

(Strangely, though I don't think that this causes the problem, there is
almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
 datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
cluster as created by* create-cluster*, so I would assume that the slave
and master are configured OK out-of the box.)

Joshua


Re: passing SparkContext as parameter

2015-09-21 Thread Cody Koeninger
That isn't accurate, I think you're confused about foreach.

Look at

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd


On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:

> foreach is something that runs on the driver, not the workers.
>
> if you want to perform some function on each record from cassandra, you
> need to do cassandraRdd.map(func), which will run distributed on the spark
> workers
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
> wrote:
>
>> Yes, but i need to read from cassandra db within a spark
>> transformation..something like..
>>
>> dstream.forachRDD{
>>
>> rdd=> rdd.foreach {
>>  message =>
>>  sc.cassandraTable()
>>   .
>>   .
>>   .
>> }
>> }
>>
>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>> available on workers ???
>>
>> Regards,
>> Padma Ch
>>
>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>>
>>> You can use broadcast variable for passing connection information.
>>>
>>> Cheers
>>>
>>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>>> wrote:
>>>
>>> can i use this sparkContext on executors ??
>>> In my application, i have scenario of reading from db for certain
>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in our
>>> case),
>>>
>>> If sparkContext couldn't be sent to executors , what is the workaround
>>> for this ??
>>>
>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
>>> wrote:
>>>
 add @transient?

 On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
 learnings.chitt...@gmail.com> wrote:

> Hello All,
>
> How can i pass sparkContext as a parameter to a method in an
> object. Because passing sparkContext is giving me TaskNotSerializable
> Exception.
>
> How can i achieve this ?
>
> Thanks,
> Padma Ch
>


>>>
>>
>


spark + parquet + schema name and metadata

2015-09-21 Thread Borisa Zivkovic
Hi,

I am trying to figure out how to write parquet metadata when persisting
DataFrames to parquet using Spark (1.4.1)

I could not find a way to change schema name (which seems to be hardcoded
to root) and also how to add data to key/value metadata in parquet footer.

org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData

org.apache.parquet.schema.Type#getName

thanks


Re: Spark does not yet support its JDBC component for Scala 2.11.

2015-09-21 Thread Ted Yu
I think the document should be updated to reflect the integration of
SPARK-8013 

Cheers

On Mon, Sep 21, 2015 at 3:48 AM, Petr Novak  wrote:

> Nice, thanks.
>
> So the note in build instruction for 2.11 is obsolete? Or there are still
> some limitations?
>
>
> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>
> On Fri, Sep 11, 2015 at 2:19 PM, Petr Novak  wrote:
>
>> Nice, thanks.
>>
>> So the note in build instruction for 2.11 is obsolete? Or there are still
>> some limitations?
>>
>>
>> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>>
>> On Fri, Sep 11, 2015 at 2:09 PM, Ted Yu  wrote:
>>
>>> Have you looked at:
>>> https://issues.apache.org/jira/browse/SPARK-8013
>>>
>>>
>>>
>>> > On Sep 11, 2015, at 4:53 AM, Petr Novak  wrote:
>>> >
>>> > Does it still apply for 1.5.0?
>>> >
>>> > What actual limitation does it mean when I switch to 2.11? No JDBC
>>> Thriftserver? No JDBC DataSource? No JdbcRDD (which is already obsolete I
>>> believe)? Some more?
>>> >
>>> > What library is the blocker to upgrade JDBC component to 2.11?
>>> >
>>> > Is there any estimate when it could be available for 2.11?
>>> >
>>> > Many thanks,
>>> > Petr
>>>
>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
foreach is something that runs on the driver, not the workers.

if you want to perform some function on each record from cassandra, you
need to do cassandraRdd.map(func), which will run distributed on the spark
workers

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
wrote:

> Yes, but i need to read from cassandra db within a spark
> transformation..something like..
>
> dstream.forachRDD{
>
> rdd=> rdd.foreach {
>  message =>
>  sc.cassandraTable()
>   .
>   .
>   .
> }
> }
>
> Since rdd.foreach gets executed on workers, how can i make sparkContext
> available on workers ???
>
> Regards,
> Padma Ch
>
> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>
>> You can use broadcast variable for passing connection information.
>>
>> Cheers
>>
>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>> wrote:
>>
>> can i use this sparkContext on executors ??
>> In my application, i have scenario of reading from db for certain records
>> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>>
>> If sparkContext couldn't be sent to executors , what is the workaround
>> for this ??
>>
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>>
>>> add @transient?
>>>
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch >> > wrote:
>>>
 Hello All,

 How can i pass sparkContext as a parameter to a method in an
 object. Because passing sparkContext is giving me TaskNotSerializable
 Exception.

 How can i achieve this ?

 Thanks,
 Padma Ch

>>>
>>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Priya Ch
Yes, but i need to read from cassandra db within a spark
transformation..something like..

dstream.forachRDD{

rdd=> rdd.foreach {
 message =>
 sc.cassandraTable()
  .
  .
  .
}
}

Since rdd.foreach gets executed on workers, how can i make sparkContext
available on workers ???

Regards,
Padma Ch

On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:

> You can use broadcast variable for passing connection information.
>
> Cheers
>
> On Sep 21, 2015, at 4:27 AM, Priya Ch 
> wrote:
>
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


Re: DataGenerator for streaming application

2015-09-21 Thread Hemant Bhanawat
Why are you using  rawSocketStream to read the data? I believe
rawSocketStream waits for a big chunk of data before it can start
processing it. I think what you are writing is a String and you should use
socketTextStream which reads the data on a per line basis.

On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa  wrote:

> Hi,
>
> I am trying to build a data generator that feeds a streaming application.
> This data generator just reads a file and send its lines through a socket.
> I get no errors on the logs, and the benchmark bellow always prints
> "Received 0 records". Am I doing something wrong?
>
>
> object MyDataGenerator {
>
>   def main(args: Array[String]) {
> if (args.length != 3) {
>   System.err.println("Usage: RawTextSender   ")
>   System.exit(1)
> }
> // Parse the arguments using a pattern match
> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>
> val serverSocket = new ServerSocket(port)
> println("Listening on port " + port)
>
>
> while (true) {
>   val socket = serverSocket.accept()
>   println("Got a new connection")
>
>
>   val out = new PrintWriter(socket.getOutputStream)
>   try {
> var count = 0
> var startTimestamp = -1
> for (line <- Source.fromFile(file).getLines()) {
>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>   if(startTimestamp < 0)
> startTimestamp = ts
>
>   if(ts - startTimestamp <= 30) {
> out.println(line)
> count += 1
>   } else {
> println(s"Emmited reports: $count")
> count = 0
> out.flush()
> startTimestamp = ts
> Thread.sleep(sleepMillis)
>   }
> }
>   } catch {
> case e: IOException =>
>   println("Client disconnected")
>   socket.close()
>   }
> }
> }
> }
>
>
>
> object Benchmark {
>   def main(args: Array[String]) {
> if (args.length != 4) {
>   System.err.println("Usage: RawNetworkGrep
> ")
>   System.exit(1)
> }
>
> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
> args(2).toInt, args(3).toInt)
> val sparkConf = new SparkConf()
> sparkConf.setAppName("BenchMark")
> 
> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
> sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 
> -XX:MaxInlineSize=300 ")
> if (sparkConf.getOption("spark.master") == None) {
>   // Master not set, as this was not launched through Spark-submit. 
> Setting master as local."
>   sparkConf.setMaster("local[*]")
> }
>
> // Create the context
> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>
> val rawStreams = (1 to numStreams).map(_ =>
>   ssc.rawSocketStream[String](host, port, 
> StorageLevel.MEMORY_ONLY_SER)).toArray
> val union = ssc.union(rawStreams)
> union.count().map(c => s"Received $c records").print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thanks.
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Ted Yu
You can use broadcast variable for passing connection information. 

Cheers

> On Sep 21, 2015, at 4:27 AM, Priya Ch  wrote:
> 
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records in 
> rdd. Hence I need sparkContext to read from DB (cassandra in our case),
> 
> If sparkContext couldn't be sent to executors , what is the workaround for 
> this ??
> 
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>> add @transient?
>> 
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch  
>>> wrote:
>>> Hello All,
>>> 
>>> How can i pass sparkContext as a parameter to a method in an object. 
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>> 
>>> How can i achieve this ?
>>> 
>>> Thanks,
>>> Padma Ch
> 


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
sparkConext is available on the driver, not on executors.

To read from Cassandra, you can use something like this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch 
wrote:

> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Romi Kuntsman
RDD is a set of data rows (in your case numbers), there is no meaning for
the order of the items.
What exactly are you trying to accomplish?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu 
wrote:

> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>


How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John


Re: passing SparkContext as parameter

2015-09-21 Thread Priya Ch
can i use this sparkContext on executors ??
In my application, i have scenario of reading from db for certain records
in rdd. Hence I need sparkContext to read from DB (cassandra in our case),

If sparkContext couldn't be sent to executors , what is the workaround for
this ??

On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an object.
>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-21 Thread Petr Novak
We have tried on another cluster installation with the same effect.

Petr

On Mon, Sep 21, 2015 at 10:45 AM, Petr Novak  wrote:

> It might be connected with my problems with gracefulShutdown in Spark
>> 1.5.0 2.11
>> https://mail.google.com/mail/#search/petr/14fb6bd5166f9395
>>
>> Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?
>>
> The provided link is obviously wrong. I haven't found it Spark mailing
> lists archive for some reason so you have to search in your mailbox for
> "Spark Streaming stop gracefully doesn't return to command line after
> upgrade to 1.4.0 and beyond"
>
> These 2 issues block us from upgrading to 1.5.0 from 1.3.0. Just having
> non-graceful shutdown which can recover from the last completed batch would
> be enough because our computation is idempotent. I just wonder why nobody
> has the same issue, it suggests that there is either something wrong on our
> side or that nobody is using KafkaDirectStream with Spark build-in
> checkpointing in production?
>
> I would just need a confirmation from community that checkpointing and
> graceful shutdown is actually working with KafkaDirectStream on 1.5.0 so
> that I can look for a problem on my side.
>
> Many thanks,
>
> Petr
>
> On Sun, Sep 20, 2015 at 12:58 PM, Petr Novak  wrote:
>
>> Hi Michal,
>> yes, it is there logged twice, it can be seen in attached log in one of
>> previous post with more details:
>>
>> 15/09/17 23:06:37 INFO StreamingContext: Invoking
>> stop(stopGracefully=false) from shutdown hook
>> 15/09/17 23:06:37 INFO StreamingContext: Invoking
>> stop(stopGracefully=false) from shutdown hook
>>
>> Thanks,
>> Petr
>>
>> On Sat, Sep 19, 2015 at 4:01 AM, Michal Čizmazia 
>> wrote:
>>
>>> Hi Petr, after Ctrl+C can you see the following message in the logs?
>>>
>>> Invoking stop(stopGracefully=false)
>>>
>>> Details:
>>> https://github.com/apache/spark/pull/6307
>>>
>>>
>>> On 18 September 2015 at 10:28, Petr Novak  wrote:
>>>
 It might be connected with my problems with gracefulShutdown in Spark
 1.5.0 2.11
 https://mail.google.com/mail/#search/petr/14fb6bd5166f9395

 Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?

 Petr

 On Fri, Sep 18, 2015 at 4:10 PM, Petr Novak 
 wrote:

> ...to ensure it is not something wrong on my cluster.
>
> On Fri, Sep 18, 2015 at 4:09 PM, Petr Novak 
> wrote:
>
>> I have tried it on Spark 1.3.0 2.10 and it works. The same code
>> doesn't on Spark 1.5.0 2.11. It would be nice if anybody could try on
>> another installation to ensure it is something wrong on my cluster.
>>
>> Many thanks,
>> Petr
>>
>> On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak 
>> wrote:
>>
>>> This one is generated, I suppose, after Ctrl+C
>>>
>>> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
>>> app-20150918143823-0001/0
>>> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
>>> app-20150918143823-0001/0
>>> 15/09/18 14:38:25 DEBUG
>>> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
>>> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
>>> Actor[akka://sparkWorker/deadLetters]
>>> 15/09/18 14:38:25 DEBUG
>>> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
>>> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
>>> Actor[akka://sparkWorker/deadLetters]
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
>>> app-20150918143823-0001/0 interrupted
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
>>> app-20150918143823-0001/0 interrupted
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
>>> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
>>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>>> java.io.IOException: Stream closed
>>> at
>>> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
>>> at
>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
>>> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> at java.io.FilterInputStream.read(FilterInputStream.java:107)
>>> at
>>> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
>>

Re: Spark does not yet support its JDBC component for Scala 2.11.

2015-09-21 Thread Petr Novak
Nice, thanks.

So the note in build instruction for 2.11 is obsolete? Or there are still
some limitations?

http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211

On Fri, Sep 11, 2015 at 2:19 PM, Petr Novak  wrote:

> Nice, thanks.
>
> So the note in build instruction for 2.11 is obsolete? Or there are still
> some limitations?
>
>
> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>
> On Fri, Sep 11, 2015 at 2:09 PM, Ted Yu  wrote:
>
>> Have you looked at:
>> https://issues.apache.org/jira/browse/SPARK-8013
>>
>>
>>
>> > On Sep 11, 2015, at 4:53 AM, Petr Novak  wrote:
>> >
>> > Does it still apply for 1.5.0?
>> >
>> > What actual limitation does it mean when I switch to 2.11? No JDBC
>> Thriftserver? No JDBC DataSource? No JdbcRDD (which is already obsolete I
>> believe)? Some more?
>> >
>> > What library is the blocker to upgrade JDBC component to 2.11?
>> >
>> > Is there any estimate when it could be available for 2.11?
>> >
>> > Many thanks,
>> > Petr
>>
>
>


spark with internal ip

2015-09-21 Thread ZhuGe
Hi there:We recently add one NIC to each node of the cluster(stand alone) for 
larger bandwidth, and we modify the /etc/hosts file,  so the hostname points to 
the new NIC's ip address(internal).What we want to achieve is that, 
communication between nodes would go through the new NIC. 
It seems the cluster would start properly. However,  we could not remotely 
access the ui as the ui is bind to the internal ip address.
Any configuration could help me to solve this issue?

Cheers Ge Zhu 

Re: Spark + Druid

2015-09-21 Thread Petr Novak
Great work.

On Fri, Sep 18, 2015 at 6:51 PM, Harish Butani 
wrote:

> Hi,
>
> I have just posted a Blog on this:
> https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani
>
> regards,
> Harish Butani.
>
> On Tue, Sep 1, 2015 at 11:46 PM, Paolo Platter 
> wrote:
>
>> Fantastic!!! I will look into that and I hope to contribute
>>
>> Paolo
>>
>> Inviata dal mio Windows Phone
>> --
>> Da: Harish Butani 
>> Inviato: ‎02/‎09/‎2015 06:04
>> A: user 
>> Oggetto: Spark + Druid
>>
>> Hi,
>>
>> I am working on the Spark Druid Package:
>> https://github.com/SparklineData/spark-druid-olap.
>> For scenarios where a 'raw event' dataset is being indexed in Druid it
>> enables you to write your Logical Plans(queries/dataflows) against the 'raw
>> event' dataset and it rewrites parts of the plan to execute as a Druid
>> Query. In Spark the configuration of a Druid DataSource is somewhat like
>> configuring an OLAP index in a traditional DB. Early results show
>> significant speedup of pushing slice and dice queries to Druid.
>>
>> It comprises of a Druid DataSource that wraps the 'raw event' dataset and
>> has knowledge of the Druid Index; and a DruidPlanner which is a set of plan
>> rewrite strategies to convert Aggregation queries into a Plan having a
>> DruidRDD.
>>
>> Here
>> 
>>  is
>> a detailed design document, which also describes a benchmark of
>> representative queries on the TPCH dataset.
>>
>> Looking for folks who would be willing to try this out and/or contribute.
>>
>> regards,
>> Harish Butani.
>>
>
>


mongo-hadoop with Spark is slow for me, and adding nodes doesn't seem to make any noticeable difference

2015-09-21 Thread cscarioni
Hi,I appreciate any help or pointers in the right direction

My current test scenario is the following.

I want to process a MongoDB collection, anonymising some fields on it and
store it in another Collection.

The size of the collection is around 900 GB with 2.5 million documents

Following is the code.



object Anonymizer extends SparkRunner {

  val sqlContext = new SQLContext(sc)

  MongoDBLoader(conf, sc,
"output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
(dbObject: BSONObject) => {
  dbObject.put("add_field", "John Macclane")
  val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
  embedded.put("business_name", Name.first_name)
  dbObject.put("embedded", webRfq)
  val notesWrapper =
Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
  notesWrapper match {
case Some(notes) =>
  notes.foreach((note: BasicDBObject) => {
note.put("text", Name.name)
  })
case None =>
  }
  dbObject
}
  )
}...

And




case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config,
sc: SparkContext, collection: String) {
  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.input.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
  mongoConfig.set("mongo.input.split_size", "50")
  mongoConfig.set("mongo.input.limit", "70")


  def rdd: RDD[(Object, BSONObject)] = {
val rdd = sc.newAPIHadoopRDD(
  mongoConfig,
  classOf[MongoInputFormat],
  classOf[Object],
  classOf[BSONObject])
rdd
  }

}


And 


case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext,
collection: String) {

  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.output.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")

  def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) =>
BSONObject) = {

val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object,
BSONObject)) => {
  (null, transformer(tuple._2))
})

mongoRDD.saveAsNewAPIHadoopFile(
  "file:///this-is-completely-unused",
  classOf[Object],
  classOf[BSONObject],
  classOf[MongoOutputFormat[Object, BSONObject]],
  mongoConfig)
  }
}


This code runs slow. Taking 9.5 hours in a 3 machine cluster to process all.
And after 6 hours in a 30 machine cluster I stopped as it was only about
half processed.

The machines are ec2 m3.large instances. The MongoDB lives on another EC2
instance inside the same VPC and same subnet.

I tried to look into the configuration options but it seems that in most
cases the defaults are the way to go (number of cores, memory, etc). 

It looks like I have some bottleneck somewhere, but not sure at all. And I
am thinking Mongo is not able to handle the parallelism? 

How are the RDDs stored in memory?. When I run it, I see I get around 32000
partitions and tasks created. Then it looks to slow down the processing
towards it advance (This can be due to mongo documents being bigger at the
second half of our DB.).

I see as well that the split is stored in HDFS in Spark and then read and
BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs
per machine) but just a tiny fraction is used. Wouldn't it be better to fill
this more and only try to insert into mongo when more data is available?. 

I also tried to increase the Split size, but it complains of not enough
resources on the worker. However I don't think the Splits are big enough to
actually fill the 6GB of memory of each node, as when it stores them on HDFS
is a lot less than that.

Is there anything obvious (or not :)) that I am not doing correctly?. Is
this the correct way to transform a collection from Mongo to Mongo?. Is
there another way?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mongo-hadoop-with-Spark-is-slow-for-me-and-adding-nodes-doesn-t-seem-to-make-any-noticeable-differene-tp24754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: passing SparkContext as parameter

2015-09-21 Thread Petr Novak
add @transient?

On Mon, Sep 21, 2015 at 11:36 AM, Petr Novak  wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an object.
>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


Re: Deploying spark-streaming application on production

2015-09-21 Thread Petr Novak
In short there is no direct support for it in Spark AFAIK. You will either
manage it in MQTT or have to add another layer of indirection - either
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know
if it could be exploited aka I don't know if it is possible to decouple
event reading into memory and actual processing code in Spark which could
be swapped on the fly. Probably not without some custom built facility for
it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak  wrote:

> I should read my posts at least once to avoid so many typos. Hopefully you
> are brave enough to read through.
>
> Petr
>
> On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak  wrote:
>
>> I think you would have to persist events somehow if you don't want to
>> miss them. I don't see any other option there. Either in MQTT if it is
>> supported there or routing them through Kafka.
>>
>> There is WriteAheadLog in Spark but you would have decouple stream MQTT
>> reading and processing into 2 separate job so that you could upgrade the
>> processing one assuming the reading one would be stable (without changes)
>> across versions. But it is problematic because there is no easy way how to
>> share DStreams between jobs - you would have develop your own facility for
>> it.
>>
>> Alternatively the reading job could could save MQTT event in its the most
>> raw form into files - to limit need to change code - and then the
>> processing job would work on top of it using Spark streaming based on
>> files. I this is inefficient and can get quite complex if you would like to
>> make it reliable.
>>
>> Basically either MQTT supports prsistence (which I don't know) or there
>> is Kafka for these use case.
>>
>> Another option would be I think to place observable streams in between
>> MQTT and Spark streaming with bakcpressure as far as you could perform
>> upgrade till buffers fills up.
>>
>> I'm sorry that it is not thought out well from my side, it is just a
>> brainstorm but it might lead you somewhere.
>>
>> Regards,
>> Petr
>>
>> On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele > > wrote:
>>
>>> Hi All,
>>>
>>> I have an spark streaming application with batch (10 ms) which is
>>> reading the MQTT channel and dumping the data from MQTT to HDFS.
>>>
>>> So suppose if I have to deploy new application jar(with changes in spark
>>> streaming application) what is the best way to deploy, currently I am doing
>>> as below
>>>
>>> 1.killing the running streaming app using yarn application -kill ID
>>> 2. and then starting the application again
>>>
>>> Problem with above approach is since we are not persisting the events in
>>> MQTT we will miss the events for the period of deploy.
>>>
>>> how to handle this case?
>>>
>>> regards
>>> jeeetndra
>>>
>>
>>
>


  1   2   >