Re: Remote jar file

2014-12-11 Thread rahulkumar-aws
Put Jar file in site HDFS, URL must be globally visible inside of your
cluster, for instance, an hdfs:// path or a file:// path that is present on
all nodes. 



-
Software Developer
SigmoidAnalytics, Bangalore

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Remote-jar-file-tp20649p20650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: custom spark app name in yarn-cluster mode

2014-12-11 Thread Sandy Ryza
Hi Tomer,

In yarn-cluster mode, the application has already been submitted to YARN by
the time the SparkContext is created, so it's too late to set the app name
there.  I believe giving it with the --name property to spark-submit should
work.

-Sandy

On Thu, Dec 11, 2014 at 10:28 AM, Tomer Benyamini 
wrote:
>
>
>
> On Thu, Dec 11, 2014 at 8:27 PM, Tomer Benyamini 
> wrote:
>
>> Hi,
>>
>> I'm trying to set a custom spark app name when running a java spark app
>> in yarn-cluster mode.
>>
>>  SparkConf sparkConf = new SparkConf();
>>
>>  sparkConf.setMaster(System.getProperty("spark.master"));
>>
>>  sparkConf.setAppName("myCustomName");
>>
>>  sparkConf.set("spark.logConf", "true");
>>
>>  JavaSparkContext sc = new JavaSparkContext(sparkConf);
>>
>>
>> Apparently this only works when running in yarn-client mode; in
>> yarn-cluster mode the app name is the class name, when viewing the app in
>> the cluster manager UI. Any idea?
>>
>>
>> Thanks,
>>
>> Tomer
>>
>>
>>
>


Re: KryoSerializer exception in Spark Streaming JAVA

2014-12-11 Thread Tathagata Das
Also please make sure you are specifying the fully qualified name of
registrator class in the sparkconf configuration correctly.
On Dec 11, 2014 5:57 PM, "bonnahu"  wrote:

> class MyRegistrator implements KryoRegistrator {
>
> public void registerClasses(Kryo kryo) {
> kryo.register(ImpressionFactsValue.class);
> }
>
> }
>
> change this class to public and give a try
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/KryoSerializer-exception-in-Spark-Streaming-JAVA-tp15479p20647.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Mllib Error

2014-12-11 Thread MEETHU MATHEW
Hi,Try this.Change spark-mllib to spark-mllib_2.10
libraryDependencies ++=Seq( "org.apache.spark" % "spark-core_2.10" % "1.1.1"
 "org.apache.spark" % "spark-mllib_2.10" % "1.1.1" ) 
Thanks & Regards,
Meethu M 

 On Friday, 12 December 2014 12:22 PM, amin mohebbi 
 wrote:
   

  I'm trying to build a very simple scala standalone app using the Mllib, but I 
get the following error when trying to bulid the program:Object Mllib is not a 
member of package org.apache.sparkThen, I realized that I have to add Mllib as 
dependency as follow :libraryDependencies ++= Seq(
"org.apache.spark"  %% "spark-core"  % "1.1.0",
"org.apache.spark"  %% "spark-mllib" % "1.1.0"
)But, here I got an error that says :unresolved dependency 
spark-core_2.10.4;1.1.1 : not foundso I had to modify it to"org.apache.spark" % 
"spark-core_2.10" % "1.1.1",But there is still an error that says :unresolved 
dependency spark-mllib;1.1.1 : not foundAnyone knows how to add dependency of 
Mllib in .sbt file?
Best Regards

...

Amin Mohebbi

PhD candidate in Software Engineering 
 at university of Malaysia  

Tel : +60 18 2040 017



E-Mail : tp025...@ex.apiit.edu.my

  amin_...@me.com

   

Mllib Error

2014-12-11 Thread amin mohebbi
 I'm trying to build a very simple scala standalone app using the Mllib, but I 
get the following error when trying to bulid the program:Object Mllib is not a 
member of package org.apache.sparkThen, I realized that I have to add Mllib as 
dependency as follow :libraryDependencies ++= Seq(
"org.apache.spark"  %% "spark-core"  % "1.1.0",
"org.apache.spark"  %% "spark-mllib" % "1.1.0"
)But, here I got an error that says :unresolved dependency 
spark-core_2.10.4;1.1.1 : not foundso I had to modify it to"org.apache.spark" % 
"spark-core_2.10" % "1.1.1",But there is still an error that says :unresolved 
dependency spark-mllib;1.1.1 : not foundAnyone knows how to add dependency of 
Mllib in .sbt file?
Best Regards

...

Amin Mohebbi

PhD candidate in Software Engineering 
 at university of Malaysia  

Tel : +60 18 2040 017



E-Mail : tp025...@ex.apiit.edu.my

  amin_...@me.com

Adding a column to a SchemaRDD

2014-12-11 Thread Nathan Kronenfeld
Hi, there.

I'm trying to understand how to augment data in a SchemaRDD.

I can see how to do it if can express the added values in SQL - just run
"SELECT *,valueCalculation AS newColumnName FROM table"

I've been searching all over for how to do this if my added value is a
scala function, with no luck.

Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
new column, D, calculated using Utility.process(b, c), and I want (of
course) to pass in the value B and C from each row, ending up with a new
SchemaRDD with columns A, B, C, and D.

Is this possible? If so, how?

Thanks,
   -Nathan

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Using Spark at the U.S.Treasury

2014-12-11 Thread Max Funk
Kindly take a moment to look over this proposal to bring Spark into the
U.S. Treasury:
http://www.systemaccounting.org/sparking_the_data_driven_republic


GroupBy and nested Top on

2014-12-11 Thread sparkuser2014
I'm currently new to pyspark, thank you for your patience in advance - my
current problem is the following:

I have a RDD composed of the field A, B, and count => 

  result1 = rdd.map(lambda x: (A,B),1).reduceByKey(lambda a,b: a + b)

Then I wanted to group the results based on 'A', so I did the following =>

  result2 = result1.map(lambda x: (x[0][0],(x[0][1],x[1]))).groupByKey()

Now, my problem/challenge, with the new RDD , I want to be
able to "subsort" and take the top 50 elements in (B, count) in descending
order and thereafter print or save the top(50) every element/instance of A.

i.e. final result =>   

A, B1, 40
A, B2, 30
A, B3, 20
A, B4, 10
A1, C1,30
A1, C2, 20
A1, C3, 10

Any guidance and help you can provide to help me solve this problem of mine
is much appreciated! Thank you :-)








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-and-nested-Top-on-tp20648.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: monitoring for spark standalone

2014-12-11 Thread Otis Gospodnetic
Hi Judy,

SPM monitors Spark.  Here are some screenshots:
http://blog.sematext.com/2014/10/07/apache-spark-monitoring/

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Dec 8, 2014 at 2:35 AM, Judy Nash 
wrote:

>  Hello,
>
>
>
> Are there ways we can programmatically get health status of master & slave
> nodes, similar to Hadoop Ambari?
>
>
>
> Wiki seems to suggest there are only web UI or instrumentations (
> http://spark.apache.org/docs/latest/monitoring.html).
>
>
>
> Thanks,
> Judy
>
>
>


Error on JavaSparkContext.stop()

2014-12-11 Thread 김태연
(Sorry if this mail is a duplicate, but it seems that my previous mail
could not reach the mailing list.)

Hi,



When my spark program calls JavaSparkContext.stop(), the following errors
occur.



   14/12/11 16:24:19 INFO Main: sc.stop {

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster02,38918) not found

   14/12/11 16:24:20 ERROR SendingConnection: Exception while
reading SendingConnection to ConnectionManagerId(cluster04,59659)

   java.nio.channels.ClosedChannelException

 at
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)

 at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)

 at
org.apache.spark.network.SendingConnection.read(Connection.scala:390)

 at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:205)

 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster03,59821) not found

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster02,38918) not found

   14/12/11 16:24:20 WARN ConnectionManager: All connections not
cleaned up

   14/12/11 16:24:20 INFO Main: sc.stop }



How can I fix this?



The configuration is as follows:

- Spark version is 1.1.1

- Client runs on Windows 7

- The cluster is Linux(CentOS 6.5).

- spark.master=yarn-client

- Since Spark has a problem submitting job from Windows to Linux, I applied
my patch to the Spark source code. (Please see
https://github.com/apache/spark/pull/899 )



Spark 1.0.0 did not have this problem.



Thanks.


Re: what is the best way to implement mini batches?

2014-12-11 Thread Ilya Ganelin
Hi all. I've been working on a similar problem. One solution that is
straightforward (if suboptimal) is to do the following.

A.zipWithIndex().filter(_._2 >=range_start && _._2 < range_end). Lastly
just put that in a for loop. I've found that this approach scales very
well.

As Matei said another option is to define a custom partitioner and then use
mapPartitions. Hope that helps!


On Thu, Dec 11, 2014 at 6:16 PM Imran Rashid  wrote:

> Minor correction:  I think you want iterator.grouped(10) for
> non-overlapping mini batches
> On Dec 11, 2014 1:37 PM, "Matei Zaharia"  wrote:
>
>> You can just do mapPartitions on the whole RDD, and then called sliding()
>> on the iterator in each one to get a sliding window. One problem is that
>> you will not be able to slide "forward" into the next partition at
>> partition boundaries. If this matters to you, you need to do something more
>> complicated to get those, such as the repartition that you said (where you
>> map each record to the partition it should be in).
>>
>> Matei
>>
>> > On Dec 11, 2014, at 10:16 AM, ll  wrote:
>> >
>> > any advice/comment on this would be much appreciated.
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini
>> -batches-tp20264p20635.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Spark Streaming in Production

2014-12-11 Thread Tathagata Das
Spark Streaming takes care of restarting receivers if it fails.
Regarding the fault-tolerance properties and deployment options, we
made some improvements in the upcoming Spark 1.2. Here is a staged
version of the Spark Streaming programming guide that you can read for
the up-to-date explanation of streaming fault-tolerance semantics.

http://people.apache.org/~tdas/spark-1.2-temp/

On Thu, Dec 11, 2014 at 4:03 PM, twizansk  wrote:
> Hi,
>
> I'm looking for resources and examples for the deployment of spark streaming
> in production.  Specifically, I would like to know how high availability and
> fault tolerance of receivers is typically achieved.
>
> The workers are managed by the spark framework and are therefore fault
> tolerant out of the box but it seems like the receiver deployment and
> management is up to me.  Is that correct?
>
> Thanks
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-Production-tp20644.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: what is the best way to implement mini batches?

2014-12-11 Thread Imran Rashid
Minor correction:  I think you want iterator.grouped(10) for
non-overlapping mini batches
On Dec 11, 2014 1:37 PM, "Matei Zaharia"  wrote:

> You can just do mapPartitions on the whole RDD, and then called sliding()
> on the iterator in each one to get a sliding window. One problem is that
> you will not be able to slide "forward" into the next partition at
> partition boundaries. If this matters to you, you need to do something more
> complicated to get those, such as the repartition that you said (where you
> map each record to the partition it should be in).
>
> Matei
>
> > On Dec 11, 2014, at 10:16 AM, ll  wrote:
> >
> > any advice/comment on this would be much appreciated.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20635.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: KryoSerializer exception in Spark Streaming JAVA

2014-12-11 Thread bonnahu
class MyRegistrator implements KryoRegistrator { 

public void registerClasses(Kryo kryo) { 
kryo.register(ImpressionFactsValue.class); 
} 
  
} 

change this class to public and give a try 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KryoSerializer-exception-in-Spark-Streaming-JAVA-tp15479p20647.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: KryoRegistrator exception and Kryo class not found while compiling

2014-12-11 Thread bonnahu
Is the class com.dataken.spark.examples.MyRegistrator public? if not, change
it to public and give a try.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KryoRegistrator-exception-and-Kryo-class-not-found-while-compiling-tp10396p20646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why is spark + scala code so slow, compared to python?

2014-12-11 Thread Andy Wagner
This is showing a factor of 200 between python and scala and 1400 when
distributed.

Is this really accurate?
If not, what is the real performance difference expected on average between
the 3 cases?


On Thu, Dec 11, 2014 at 11:33 AM, Duy Huynh  wrote:

> just to give some reference point.  with the same algorithm running on
> mnist dataset.
>
> 1.  python implementation:  ~10 miliseconds per iteration (can be faster
> if i switch to gpu)
>
> 2.  local version (scala + breeze):  ~2 seconds per iteration
>
> 3.  distributed version (spark + scala + breeze):  15 seconds per iteration
>
> i love spark and really enjoy writing scala code.  but this huge
> difference in performance makes it really hard to do any kind of machine
> learning work.
>
>
>
>
> On Thu, Dec 11, 2014 at 2:18 PM, Duy Huynh 
> wrote:
>
>> both.
>>
>> first, the distributed version is so much slower than python.  i tried a
>> few things like broadcasting variables, replacing Seq with Array, and a few
>> other little things.  it helps to improve the performance, but still slower
>> than the python code.
>>
>> so, i wrote a local version that's pretty much just running a bunch of
>> breeze/blas operations.  i guess that's purely scala (no spark).  this
>> local version is faster than the distributed version but still much slower
>> than the python code.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Dec 11, 2014 at 2:09 PM, Natu Lauchande 
>> wrote:
>>
>>> Are you using Scala in a distributed enviroment or in a standalone mode ?
>>>
>>> Natu
>>>
>>> On Thu, Dec 11, 2014 at 8:23 PM, ll  wrote:
>>>
 hi.. i'm converting some of my machine learning python code into scala +
 spark.  i haven't been able to run it on large dataset yet, but on small
 datasets (like http://yann.lecun.com/exdb/mnist/), my spark + scala
 code is
 much slower than my python code (5 to 10 times slower than python)

 i already tried everything to improve my spark + scala code like
 broadcasting variables, caching the RDD, replacing all my matrix/vector
 operations with breeze/blas, etc.  i saw some improvements, but it's
 still a
 lot slower than my python code.

 why is that?

 how do you improve your spark + scala performance today?

 or is spark + scala just not the right tool for small to medium
 datasets?

 when would you use spark + scala vs. python?

 thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Spark Server - How to implement

2014-12-11 Thread Marcelo Vanzin
Hi Manoj,

I'm not aware of any public projects that do something like that,
except for the Ooyala server which you say doesn't cover your needs.

We've been playing with something like that inside Hive, though:

On Thu, Dec 11, 2014 at 5:33 PM, Manoj Samel  wrote:
> Hi,
>
> If spark based services are to be exposed as a continuously available
> server, what are the options?
>
> * The API exposed to client will be proprietary and fine grained (RPC style
> ..), not a Job level API
> * The client API need not be SQL so the Thrift JDBC server does not seem to
> be option .. but I could be wrong here ...
> * Ooyala implementation is a REST API for job submission, but as mentioned
> above; the desired API is a finer grain API, not a job submission
>
> Any existing implementation?
>
> Is it build your own server? Any thoughts on approach to use ?
>
> Thanks,
>
>
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Server - How to implement

2014-12-11 Thread Marcelo Vanzin
Oops, sorry, fat fingers.

We've been playing with something like that inside Hive:
https://github.com/apache/hive/tree/spark/spark-client

That seems to have at least a few of the characteristics you're
looking for; but it's a very young project, and at this moment we're
not developing it as a public API, but mostly for internal Hive use.
It can give you a few ideas, though. Also, SPARK-3215.


On Thu, Dec 11, 2014 at 5:41 PM, Marcelo Vanzin  wrote:
> Hi Manoj,
>
> I'm not aware of any public projects that do something like that,
> except for the Ooyala server which you say doesn't cover your needs.
>
> We've been playing with something like that inside Hive, though:
>
> On Thu, Dec 11, 2014 at 5:33 PM, Manoj Samel  wrote:
>> Hi,
>>
>> If spark based services are to be exposed as a continuously available
>> server, what are the options?
>>
>> * The API exposed to client will be proprietary and fine grained (RPC style
>> ..), not a Job level API
>> * The client API need not be SQL so the Thrift JDBC server does not seem to
>> be option .. but I could be wrong here ...
>> * Ooyala implementation is a REST API for job submission, but as mentioned
>> above; the desired API is a finer grain API, not a job submission
>>
>> Any existing implementation?
>>
>> Is it build your own server? Any thoughts on approach to use ?
>>
>> Thanks,
>>
>>
>>
>>
>
>
>
> --
> Marcelo



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Server - How to implement

2014-12-11 Thread Manoj Samel
Hi,

If spark based services are to be exposed as a continuously available
server, what are the options?

* The API exposed to client will be proprietary and fine grained (RPC style
..), not a Job level API
* The client API need not be SQL so the Thrift JDBC server does not seem to
be option .. but I could be wrong here ...
* Ooyala implementation is a REST API for job submission, but as mentioned
above; the desired API is a finer grain API, not a job submission

Any existing implementation?

Is it build your own server? Any thoughts on approach to use ?

Thanks,


Error on JavaSparkContext.stop()

2014-12-11 Thread Taeyun Kim
(Sorry if this mail is duplicate, but it seems that my previous mail could
not reach the mailing list.)

Hi,

 

When my spark program calls JavaSparkContext.stop(), the following errors
occur.

   

   14/12/11 16:24:19 INFO Main: sc.stop {

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster02,38918) not found

   14/12/11 16:24:20 ERROR SendingConnection: Exception while
reading SendingConnection to ConnectionManagerId(cluster04,59659)

   java.nio.channels.ClosedChannelException

 at
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)

 at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)

 at
org.apache.spark.network.SendingConnection.read(Connection.scala:390)

 at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.sca
la:205)

 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
15)

 at java.lang.Thread.run(Thread.java:745)

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster03,59821) not found

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster02,38918) not found

   14/12/11 16:24:20 WARN ConnectionManager: All connections not
cleaned up

   14/12/11 16:24:20 INFO Main: sc.stop }

 

How can I fix this?

 

The configuration is as follows:

- Spark version is 1.1.1

- Client runs on Windows 7

- The cluster is Linux(CentOS 6.5).

- spark.master=yarn-client

- Since Spark has a problem submitting job from Windows to Linux, I applied
my patch to the Spark source code. (Please see
https://github.com/apache/spark/pull/899 )

 

Spark 1.0.0 did not have this problem.

 

Thanks.

 



Spark Streaming in Production

2014-12-11 Thread twizansk
Hi,

I'm looking for resources and examples for the deployment of spark streaming
in production.  Specifically, I would like to know how high availability and
fault tolerance of receivers is typically achieved.

The workers are managed by the spark framework and are therefore fault
tolerant out of the box but it seems like the receiver deployment and
management is up to me.  Is that correct?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-Production-tp20644.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Specifying number of executors in Mesos

2014-12-11 Thread Andrew Ash
Gerard,

Are you familiar with spark.deploy.spreadOut
 in Standalone
mode?  It sounds like you want the same thing in Mesos mode.

On Thu, Dec 11, 2014 at 6:48 AM, Tathagata Das 
wrote:

> Not that I am aware of. Spark will try to spread the tasks evenly
> across executors, its not aware of the workers at all. So if the
> executors to worker allocation is uneven, I am not sure what can be
> done. Maybe others can get smoe ideas.
>
> On Tue, Dec 9, 2014 at 6:20 AM, Gerard Maas  wrote:
> > Hi,
> >
> > We've a number of Spark Streaming /Kafka jobs that would benefit of an
> even
> > spread of consumers over physical hosts in order to maximize network
> usage.
> > As far as I can see, the Spark Mesos scheduler accepts resource offers
> until
> > all required Mem + CPU allocation has been satisfied.
> >
> > This basic resource allocation policy results in large executors spread
> over
> > few nodes, resulting in many Kafka consumers in a single node (e.g. from
> 12
> > consumers, I've seen allocations of 7/3/2)
> >
> > Is there a way to tune this behavior to achieve executor allocation on a
> > given number of hosts?
> >
> > -kr, Gerard.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Job status from Python

2014-12-11 Thread Michael Nazario
In PySpark, is there a way to get the status of a job which is currently 
running? My use case is that I have a long running job that users may not know 
whether or not the job is still running. It would be nice to have an idea of 
whether or not the job is progressing even if it isn't very granular.

I've looked into the Application detailed UI which has per-stage information 
(but unfortunately is not in json format), but even at that point I don't 
necessarily know which stages correspond to a job I started.

So I guess my main questions are:

  1.  How do I get the job id of a job started in python?
  2.  If possible, how do I get the stages which correspond to that job?
  3.  Is there any way to get information about currently running stages 
without parsing the Stage UI HTML page?
  4.  Has anyone approached this problem in a different way?


Running spark-submit from a remote machine using a YARN application

2014-12-11 Thread ryaminal
We are trying to submit a Spark application from a Tomcat application running
our business logic. The Tomcat app lives in a seperate non-hadoop cluster.
We first were doing this by using the spark-yarn package to directly call
Client#runApp() but found that the API we were using in Spark is being made
private in future releases. 
 
Now our solution is to make a very simply YARN application which execustes
as its command "spark-submit --master yarn-cluster s3n://application/jar.jar
...". This seemed so simple and elegant, but it has some weird issues. We
get "NoClassDefFoundErrors". When we ssh to the box, run the same
spark-submit command it works, but doing this through YARN leads in the
NoClassDefFoundErrors mentioned.
 
Also, examining the environment and Java properties between the working and
broken, we find that they have a different java classpath. So weird...
 
Has anyone had this problem or know a solution? We would be happy to post
our very simple code for creating the YARN application.
 
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-submit-from-a-remote-machine-using-a-YARN-application-tp20642.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Native library error when trying to use Spark with Snappy files

2014-12-11 Thread Rich Haase
I am running a Hadoop cluster with Spark on YARN.  The cluster running the 
CDH5.2 distribution.  When I try to run spark jobs against snappy compressed 
files I receive the following error.

java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native 
Method)

org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)

org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:190)

org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:176)

org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:110)

org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:198)
org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:189)
org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:98)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

I have tried to set  JAVA_LIBRARY_PATH, LD_LIBRARY_PATH, 
spark.executor.extraLibraryPath, spark.executor.extraClassPath and more with 
absolutely no luck.

Additionally, I have confirmed that I can run map reduce jobs against snappy 
files without any problem and hadoop checknative looks good:

$ hadoop checknative -a
14/12/11 13:51:07 INFO bzip2.Bzip2Factory: Successfully loaded & initialized 
native-bzip2 library system-native
14/12/11 13:51:07 INFO zlib.ZlibFactory: Successfully loaded & initialized 
native-zlib library
Native library checking:
hadoop:  true /usr/lib/hadoop/lib/native/libhadoop.so.1.0.0
zlib:true /lib/x86_64-linux-gnu/libz.so.1
snappy:  true /usr/lib/hadoop/lib/native/libsnappy.so.1
lz4: true revision:99
bzip2:   true /lib/x86_64-linux-gnu/libbz2.so.1
openssl: true /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0

Can anyone give me any suggestions as to why this would not be working or 
better yet, how I can fix this problem?

Thanks!!!

Rich Haase | Sr. Software Engineer | Pandora
m 303.887.1146 | rha...@pandora.com


Re: RDDs being cleaned too fast

2014-12-11 Thread Ranga
I was having similar issues with my persistent RDDs. After some digging
around, I noticed that the partitions were not balanced evenly across the
available nodes. After a "repartition", the RDD was spread evenly across
all available memory. Not sure if that is something that would help your
use-case though.
You could also increase the spark.storage.memoryFraction if that is an
option.


- Ranga

On Wed, Dec 10, 2014 at 10:23 PM, Aaron Davidson  wrote:

> The ContextCleaner uncaches RDDs that have gone out of scope on the
> driver. So it's possible that the given RDD is no longer reachable in your
> program's control flow, or else it'd be a bug in the ContextCleaner.
>
> On Wed, Dec 10, 2014 at 5:34 PM, ankits  wrote:
>
>> I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too
>> fast.
>> How can i inspect the size of RDD in memory and get more information about
>> why it was cleaned up. There should be more than enough memory available
>> on
>> the cluster to store them, and by default, the spark.cleaner.ttl is
>> infinite, so I want more information about why this is happening and how
>> to
>> prevent it.
>>
>> Spark just logs this when removing RDDs:
>>
>> [2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] -
>> Removing
>> RDD 33
>> [2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
>> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>> [2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] -
>> Removing
>> RDD 33
>> [2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
>> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: broadcast: OutOfMemoryError

2014-12-11 Thread Sameer Farooqui
Is the OOM happening to the Driver JVM or one of the Executor JVMs? What
memory size is each JVM?

How large is the data you're trying to broadcast? If it's large enough, you
may want to consider just persisting the data to distributed storage (like
HDFS) and read it in through the normal read RDD methods like sc.textFile().

Maybe someone else can comment on what the largest recommended data
collection sizes are to use with Broadcast...



On Thu, Dec 11, 2014 at 10:14 AM, ll  wrote:

> hi.  i'm running into this OutOfMemory issue when i'm broadcasting a large
> array.  what is the best way to handle this?
>
> should i split the array into smaller arrays before broadcasting, and then
> combining them locally at each node?
>
> thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-OutOfMemoryError-tp20633.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Key not valid / already cancelled using Spark Streaming

2014-12-11 Thread Tathagata Das
Aah yes, that makes sense. You could write first to HDFS, and when
that works, copy from HDFS to S3. That should work as it wont depend
on the temporary files to be in S3.
I am not sure how much you can customize just for S3 in Spark code. In
Spark, since we just use Hadoop API to write there isnt much, if at
all any, code that is customized to a particular file system. An
alternative is to see whether you could do retries efficiently for all
file systems.

TD

On Thu, Dec 11, 2014 at 6:50 AM, Flávio Santos
 wrote:
> Hello guys,
>
> Thank you for your prompt reply.
> I followed Akhil suggestion with no success. Then, I tried again replacing
> S3 by HDFS and the job seems to work properly.
> TD, I'm not using speculative execution.
>
> I think I've just realized what is happening. Due to S3 eventual
> consistency, these temporary files sometimes are found, sometimes they are
> not. I confirmed this hypothesis via s3cmd.
> So, I come up with two questions/suggestions:
>
> 1. Does Spark support these temporary files to be written on HDFS and my
> final output on S3?
> 2. What do you think about adding a property like 'spark.s3.maxRetries' that
> determines the number of retries before assuming that a file is indeed not
> found? I can contribute with this patch if you want.
> (Hadoop already have a similar property 'fs.s3.maxRetries', but for
> IOException and S3Exception.)
>
> Thanks again and I look forward for your comments,
>
> --
> Flávio R. Santos
>
> Chaordic | Platform
> www.chaordic.com.br
> +55 48 3232.3200
>
> On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das
>  wrote:
>>
>> Following Gerard's thoughts, here are possible things that could be
>> happening.
>>
>> 1. Is there another process in the background that is deleting files
>> in the directory where you are trying to write? Seems like the
>> temporary file generated by one of the tasks is getting delete before
>> it is renamed to the final output file. I suggest trying to not write
>> to S3, rather just count and print (with rest of the computation
>> staying exactly same) and see if the error still occurs. That would
>> narrow down the culprit to what Gerard suggested.
>> 2. Do you have speculative execution turned on? If so, could you turn
>> it off and try?
>>
>> TD
>>
>> On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas 
>> wrote:
>> > If the timestamps in the logs are to be trusted It looks like your
>> > driver is
>> > dying with that java.io.FileNotFoundException: and therefore the workers
>> > loose their connection and close down.
>> >
>> > -kr, Gerard.
>> >
>> > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das 
>> > wrote:
>> >>
>> >> Try to add the following to the sparkConf
>> >>
>> >>  .set("spark.core.connection.ack.wait.timeout","6000")
>> >>
>> >>   .set("spark.akka.frameSize","60")
>> >>
>> >> Used to face that issue with spark 1.1.0
>> >>
>> >> Thanks
>> >> Best Regards
>> >>
>> >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
>> >>  wrote:
>> >>>
>> >>> Dear Spark'ers,
>> >>>
>> >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1)
>> >>> and
>> >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In
>> >>> short, my
>> >>> job does the following:
>> >>> - Consumes a specific Kafka topic
>> >>> - Writes its content to S3 or HDFS
>> >>>
>> >>> Records in Kafka are in the form:
>> >>> {"key": "someString"}
>> >>>
>> >>> This is important because I use the value of "key" to define the
>> >>> output
>> >>> file name in S3.
>> >>> Here are the Spark and Kafka parameters I'm using:
>> >>>
>>  val sparkConf = new SparkConf()
>>    .setAppName("MyDumperApp")
>>    .set("spark.task.maxFailures", "100")
>>    .set("spark.hadoop.validateOutputSpecs", "false")
>>    .set("spark.serializer",
>>  "org.apache.spark.serializer.KryoSerializer")
>>    .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>>  val kafkaParams = Map(
>>    "zookeeper.connect" -> zkQuorum,
>>    "zookeeper.session.timeout.ms" -> "1",
>>    "rebalance.backoff.ms" -> "8000",
>>    "rebalance.max.retries" -> "10",
>>    "group.id" -> group,
>>    "auto.offset.reset" -> "largest"
>>  )
>> >>>
>> >>>
>> >>> My application is the following:
>> >>>
>>  KafkaUtils.createStream[String, String, StringDecoder,
>>  StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>>  StorageLevel.MEMORY_AND_DISK_SER_2)
>>    .foreachRDD((rdd, time) =>
>>  rdd.map {
>>    case (_, line) =>
>>  val json = parse(line)
>>  val key = extract(json, "key").getOrElse("key_not_found")
>>  (key, dateFormatter.format(time.milliseconds)) -> line
>>  }
>>    .partitionBy(new HashPartitioner(10))
>>    .saveAsHadoopFile[KeyBasedOutput[(String,String),
>>  String]]("s3://BUCKET", classOf[BZip2Codec])
>>    )
>> >>>
>> >>>
>> >>> And the last piece:
>> >>>
>>  class KeyBasedOutput[T >: Null, V <: AnyRef] e

Re: Error: Spark-streaming to Cassandra

2014-12-11 Thread Tathagata Das
This seems to be compilation errors. The second one seems to be that
you are using CassandraJavaUtil.javafunctions wrong. Look at the
documentation and set the parameter list correctly.

TD


On Mon, Dec 8, 2014 at 9:47 AM,   wrote:
> Hi,
>
> I am intending to save the streaming data from kafka into Cassandra,
> using spark-streaming:
> But there seems to be problem with line
> javaFunctions(data).writerBuilder("testkeyspace", "test_table", 
> mapToRow(TestTable.class)).saveToCassandra();
> I am getting 2 errors.
> the code, the error-log and POM.xml dependencies are listed below:
> Please help me find the reason as to why is this happening.
>
>
> public class SparkStream {
> static int key=0;
> public static void main(String args[]) throws Exception
> {
> if(args.length != 3)
> {
> System.out.println("SparkStream   
> ");
> System.exit(1);
> }
>
> Logger.getLogger("org").setLevel(Level.OFF);
> Logger.getLogger("akka").setLevel(Level.OFF);
> Map topicMap = new HashMap();
> String[] topic = args[2].split(",");
> for(String t: topic)
> {
> topicMap.put(t, new Integer(3));
> }
>
> /* Connection to Spark */
> SparkConf conf = new SparkConf();
> JavaSparkContext sc = new JavaSparkContext("local[4]", 
> "SparkStream",conf);
> JavaStreamingContext jssc = new JavaStreamingContext(sc, new 
> Duration(3000));
>
>
>   /* connection to cassandra */
> /*conf.set("spark.cassandra.connection.host", "127.0.0.1:9042");
> CassandraConnector connector = CassandraConnector.apply(sc.getConf());
> Session session = connector.openSession();
> session.execute("CREATE TABLE IF NOT EXISTS testkeyspace.test_table 
> (key INT PRIMARY KEY, value TEXT)");
> */
>
> /* Receive Kafka streaming inputs */
> JavaPairReceiverInputDStream messages = 
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
>
>
> /* Create DStream */
> JavaDStream data = messages.map(new 
> Function, TestTable >()
> {
> public TestTable call(Tuple2 message)
> {
> return new TestTable(new Integer(++key), message._2() );
> }
> }
> );
>
>
> /* Write to cassandra */
> javaFunctions(data).writerBuilder("testkeyspace", "test_table", 
> mapToRow(TestTable.class)).saveToCassandra();
> //  data.print();
>
>
> jssc.start();
> jssc.awaitTermination();
>
> }
> }
>
> class TestTable implements Serializable
> {
> Integer key;
> String value;
>
> public TestTable() {}
>
> public TestTable(Integer k, String v)
> {
> key=k;
> value=v;
> }
>
> public Integer getKey(){
> return key;
> }
>
> public void setKey(Integer k){
> key=k;
> }
>
> public String getValue(){
> return value;
> }
>
> public void setValue(String v){
> value=v;
> }
>
> public String toString(){
> return MessageFormat.format("TestTable'{'key={0},
> value={1}'}'", key, value);
> }
> }
>
> The output log is:
>
> [INFO] Compiling 1 source file to
> /root/Documents/SparkStreamSample/target/classes
> [INFO] 2 errors
> [INFO] -
> [ERROR] COMPILATION ERROR :
> [INFO] -
> [ERROR] 
> /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,81]
> cannot find symbol
>   symbol:   method mapToRow(java.lang.Class)
>   location: class com.spark.SparkStream
> [ERROR] 
> /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,17]
> no suitable method found for
> javaFunctions(org.apache.spark.streaming.api.java.JavaDStream)
> method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaDStream,java.lang.Class)
> is not applicable
>   (cannot infer type-variable(s) T
> (actual and formal argument lists differ in length))
> method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.dstream.DStream,java.lang.Class)
> is not applicable
>   (cannot infer type-variable(s) T
> (actual and formal argument lists differ in length))
> method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.api.java.JavaRDD,java.lang.Class)
> is not applicable
>   (cannot infer type-variable(s) T
> (actual and formal argument lists differ in length))
> method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.rdd.RDD,java.lang.Class)
> is not applicable
>   (cannot infer type-variable(s) T
> (actual and formal argument lists differ in length))
> method 
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.sp

Re: Does filter on an RDD scan every data item ?

2014-12-11 Thread dsiegel
Also, you may want to use .lookup() instead of .filter()

def
lookup(key: K): Seq[V]
Return the list of values in the RDD for key key. This operation is done
efficiently if the RDD has a known partitioner by only searching the
partition that the key maps to.

You might want to partition your first batch of data with .partitionBy()
using your CustomTuple hash implementation, persist it, and do not run any
operations on it which can remove it's partitioner object.










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there an efficient way to append new data to a registered Spark SQL Table?

2014-12-11 Thread Rakesh Nair
TD,

While looking at the API Ref(version 1.1.0) for SchemaRDD i did find these
two methods:
 def insertInto(tableName: String): Unit
 def insertInto(tableName: String, overwrite: Boolean): Unit

Wouldnt these be a nicer way of appending RDD's to a table or are these not
recommended as of now? Also will this apply to a table created using the
"registerTempTable" method ?


On Thu, Dec 11, 2014 at 6:46 AM, Tathagata Das 
wrote:
>
> First of all, how long do you want to keep doing this? The data is
> going to increase infinitely and without any bounds, its going to get
> too big for any cluster to handle. If all that is within bounds, then
> try the following.
>
> - Maintain a global variable having the current RDD storing all the
> log data. We are going to keep updating this variable.
> - Every batch interval, take new data and union it with the earlier
> unified RDD (in the global variable) and update the global variable.
> If you want sequel queries on this data, then you will have
> re-register this new RDD as the named table.
> - With this approach the number of partitions is going to increase
> rapidly. So periodically take the unified RDD and repartition it to a
> smaller set of partitions. This messes up the ordering of data, but
> you maybe fine with if your queries are order agnostic. Also,
> periodically, checkpoint this RDD, otherwise the lineage is going to
> grow indefinitely and everything will start getting slower.
>
> Hope this helps.
>
> TD
>
> On Mon, Dec 8, 2014 at 6:29 PM, Xuelin Cao 
> wrote:
> >
> > Hi,
> >
> >   I'm wondering whether there is an  efficient way to continuously
> > append new data to a registered spark SQL table.
> >
> >   This is what I want:
> >   I want to make an ad-hoc query service to a json formated system
> log.
> > Certainly, the system log is continuously generated. I will use spark
> > streaming to connect the system log as my input, and I want to find a
> way to
> > effectively append the new data into an existed spark SQL table. Further
> > more, I want the whole table being cached in memory/tachyon.
> >
> >   It looks like spark sql supports the "INSERT" method, but only for
> > parquet file. In addition, it is inefficient to insert a single row every
> > time.
> >
> >   I do know that somebody build a similar system that I want (ad-hoc
> > query service to a on growing system log). So, there must be an efficient
> > way. Anyone knows?
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-- 
Regards
Rakesh Nair


Re: what is the best way to implement mini batches?

2014-12-11 Thread Duy Huynh
the dataset i'm working on has about 100,000 records.  the batch that we're
training on has a size around 10.  can you repartition(10,000) into 10,000
partitions?

On Thu, Dec 11, 2014 at 2:36 PM, Matei Zaharia 
wrote:

> You can just do mapPartitions on the whole RDD, and then called sliding()
> on the iterator in each one to get a sliding window. One problem is that
> you will not be able to slide "forward" into the next partition at
> partition boundaries. If this matters to you, you need to do something more
> complicated to get those, such as the repartition that you said (where you
> map each record to the partition it should be in).
>
> Matei
>
> > On Dec 11, 2014, at 10:16 AM, ll  wrote:
> >
> > any advice/comment on this would be much appreciated.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20635.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>


Re: why is spark + scala code so slow, compared to python?

2014-12-11 Thread Sean Owen
In general, you would not expect a distributed computation framework
to perform nearly as fast as a non-distributed one, when both are run
on one machine. Spark has so much more overhead that doesn't go away
just because it's on one machine. Of course, that's the very reason it
scales past one machine too.

That said, you may also not be using Spark optimally, whereas you
probably use the tool you know optimally. You may not be comparing
algorithms apples-to-apples either.

Don't use Spark for its own sake. Use it because you need it for the
things it does, like scaling up or integrating with other components.
If you really have a small, isolated ML problem, you probably want to
use your familiar local tools.

On Thu, Dec 11, 2014 at 7:18 PM, Duy Huynh  wrote:
> both.
>
> first, the distributed version is so much slower than python.  i tried a few
> things like broadcasting variables, replacing Seq with Array, and a few
> other little things.  it helps to improve the performance, but still slower
> than the python code.
>
> so, i wrote a local version that's pretty much just running a bunch of
> breeze/blas operations.  i guess that's purely scala (no spark).  this local
> version is faster than the distributed version but still much slower than
> the python code.
>
>
>
>
>
>
>
> On Thu, Dec 11, 2014 at 2:09 PM, Natu Lauchande 
> wrote:
>>
>> Are you using Scala in a distributed enviroment or in a standalone mode ?
>>
>> Natu
>>
>> On Thu, Dec 11, 2014 at 8:23 PM, ll  wrote:
>>>
>>> hi.. i'm converting some of my machine learning python code into scala +
>>> spark.  i haven't been able to run it on large dataset yet, but on small
>>> datasets (like http://yann.lecun.com/exdb/mnist/), my spark + scala code
>>> is
>>> much slower than my python code (5 to 10 times slower than python)
>>>
>>> i already tried everything to improve my spark + scala code like
>>> broadcasting variables, caching the RDD, replacing all my matrix/vector
>>> operations with breeze/blas, etc.  i saw some improvements, but it's
>>> still a
>>> lot slower than my python code.
>>>
>>> why is that?
>>>
>>> how do you improve your spark + scala performance today?
>>>
>>> or is spark + scala just not the right tool for small to medium datasets?
>>>
>>> when would you use spark + scala vs. python?
>>>
>>> thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Proper way to check SparkContext's status within code

2014-12-11 Thread Edwin
Hi, 
Is there a way to check the status of the SparkContext regarding whether
it's alive or not through the code, not through UI or anything else?

Thanks 
Edwin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Proper-way-to-check-SparkContext-s-status-within-code-tp20638.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: what is the best way to implement mini batches?

2014-12-11 Thread Matei Zaharia
You can just do mapPartitions on the whole RDD, and then called sliding() on 
the iterator in each one to get a sliding window. One problem is that you will 
not be able to slide "forward" into the next partition at partition boundaries. 
If this matters to you, you need to do something more complicated to get those, 
such as the repartition that you said (where you map each record to the 
partition it should be in).

Matei

> On Dec 11, 2014, at 10:16 AM, ll  wrote:
> 
> any advice/comment on this would be much appreciated.  
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20635.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why is spark + scala code so slow, compared to python?

2014-12-11 Thread Duy Huynh
just to give some reference point.  with the same algorithm running on
mnist dataset.

1.  python implementation:  ~10 miliseconds per iteration (can be faster if
i switch to gpu)

2.  local version (scala + breeze):  ~2 seconds per iteration

3.  distributed version (spark + scala + breeze):  15 seconds per iteration

i love spark and really enjoy writing scala code.  but this huge difference
in performance makes it really hard to do any kind of machine learning work.




On Thu, Dec 11, 2014 at 2:18 PM, Duy Huynh  wrote:

> both.
>
> first, the distributed version is so much slower than python.  i tried a
> few things like broadcasting variables, replacing Seq with Array, and a few
> other little things.  it helps to improve the performance, but still slower
> than the python code.
>
> so, i wrote a local version that's pretty much just running a bunch of
> breeze/blas operations.  i guess that's purely scala (no spark).  this
> local version is faster than the distributed version but still much slower
> than the python code.
>
>
>
>
>
>
>
> On Thu, Dec 11, 2014 at 2:09 PM, Natu Lauchande 
> wrote:
>
>> Are you using Scala in a distributed enviroment or in a standalone mode ?
>>
>> Natu
>>
>> On Thu, Dec 11, 2014 at 8:23 PM, ll  wrote:
>>
>>> hi.. i'm converting some of my machine learning python code into scala +
>>> spark.  i haven't been able to run it on large dataset yet, but on small
>>> datasets (like http://yann.lecun.com/exdb/mnist/), my spark + scala
>>> code is
>>> much slower than my python code (5 to 10 times slower than python)
>>>
>>> i already tried everything to improve my spark + scala code like
>>> broadcasting variables, caching the RDD, replacing all my matrix/vector
>>> operations with breeze/blas, etc.  i saw some improvements, but it's
>>> still a
>>> lot slower than my python code.
>>>
>>> why is that?
>>>
>>> how do you improve your spark + scala performance today?
>>>
>>> or is spark + scala just not the right tool for small to medium datasets?
>>>
>>> when would you use spark + scala vs. python?
>>>
>>> thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: why is spark + scala code so slow, compared to python?

2014-12-11 Thread Duy Huynh
both.

first, the distributed version is so much slower than python.  i tried a
few things like broadcasting variables, replacing Seq with Array, and a few
other little things.  it helps to improve the performance, but still slower
than the python code.

so, i wrote a local version that's pretty much just running a bunch of
breeze/blas operations.  i guess that's purely scala (no spark).  this
local version is faster than the distributed version but still much slower
than the python code.







On Thu, Dec 11, 2014 at 2:09 PM, Natu Lauchande 
wrote:

> Are you using Scala in a distributed enviroment or in a standalone mode ?
>
> Natu
>
> On Thu, Dec 11, 2014 at 8:23 PM, ll  wrote:
>
>> hi.. i'm converting some of my machine learning python code into scala +
>> spark.  i haven't been able to run it on large dataset yet, but on small
>> datasets (like http://yann.lecun.com/exdb/mnist/), my spark + scala code
>> is
>> much slower than my python code (5 to 10 times slower than python)
>>
>> i already tried everything to improve my spark + scala code like
>> broadcasting variables, caching the RDD, replacing all my matrix/vector
>> operations with breeze/blas, etc.  i saw some improvements, but it's
>> still a
>> lot slower than my python code.
>>
>> why is that?
>>
>> how do you improve your spark + scala performance today?
>>
>> or is spark + scala just not the right tool for small to medium datasets?
>>
>> when would you use spark + scala vs. python?
>>
>> thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: why is spark + scala code so slow, compared to python?

2014-12-11 Thread Natu Lauchande
Are you using Scala in a distributed enviroment or in a standalone mode ?

Natu

On Thu, Dec 11, 2014 at 8:23 PM, ll  wrote:

> hi.. i'm converting some of my machine learning python code into scala +
> spark.  i haven't been able to run it on large dataset yet, but on small
> datasets (like http://yann.lecun.com/exdb/mnist/), my spark + scala code
> is
> much slower than my python code (5 to 10 times slower than python)
>
> i already tried everything to improve my spark + scala code like
> broadcasting variables, caching the RDD, replacing all my matrix/vector
> operations with breeze/blas, etc.  i saw some improvements, but it's still
> a
> lot slower than my python code.
>
> why is that?
>
> how do you improve your spark + scala performance today?
>
> or is spark + scala just not the right tool for small to medium datasets?
>
> when would you use spark + scala vs. python?
>
> thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Flávio Santos
Hi Mario,

Try to include this to your libraryDependencies (in your sbt file):

  "org.apache.kafka" % "kafka_2.10" % "0.8.0"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("org.slf4j", "slf4j-simple")

Regards,


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br *
+55 48 3232.3200

On Thu, Dec 11, 2014 at 12:32 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

>  Thanks akhil for the answer.
>
> I am using sbt assembly and the build.sbt is in the first email. Do you
> know why those classes are included in that way?
>
>
> Thanks,
> Mario
>
>
> On 11.12.2014 14:51, Akhil Das wrote:
>
>  Yes. You can do/use *sbt assembly* and create a big fat jar with all
> dependencies bundled inside it.
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
> mario.pastore...@teralytics.ch> wrote:
>
>>  In this way it works but it's not portable and the idea of having a fat
>> jar is to avoid exactly this. Is there any system to create a
>> self-contained portable fatJar?
>>
>>
>> On 11.12.2014 13:57, Akhil Das wrote:
>>
>>  Add these jars while creating the Context.
>>
>> val sc = new SparkContext(conf)
>>
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
>> *spark-streaming-kafka_2.10-1.1.0.jar*")
>> sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
>> *zkclient-0.3.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
>> *metrics-core-2.2.0.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
>> *kafka_2.10-0.8.0.jar*")
>>
>> val ssc = new StreamingContext(sc, Seconds(10))
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
>> mario.pastore...@teralytics.ch> wrote:
>>
>>>  Hi,
>>>
>>> I'm trying to use spark-streaming with kafka but I get a strange error
>>> on class that are missing. I would like to ask if my way to build the fat
>>> jar is correct or no. My program is
>>>
>>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>>> kafkaGroupId, kafkaTopicsWithThreads)
>>> .map(_._2)
>>>
>>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>> iter:Iterator[CellWithLAC] =>
>>>   println("time: " ++ t.toString ++ " #received: " ++
>>> iter.size.toString)
>>> })
>>>
>>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>>> plugin) is
>>>
>>> name := "spark_example"
>>>
>>> version := "0.0.1"
>>>
>>> scalaVersion := "2.10.4"
>>>
>>> scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>>   "joda-time" % "joda-time" % "2.6"
>>> )
>>>
>>> assemblyMergeStrategy in assembly := {
>>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/commons/beanutils" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>>   case "plugin.properties" => MergeStrategy.discard
>>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>>   case x =>
>>> val oldStrategy = (assemblyMergeStrategy in assembly).value
>>> oldStrategy(x)
>>> }
>>>
>>> I create the jar with sbt assembly and the run with
>>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>>> test-consumer-group test1
>>>
>>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>>> program starts and keep running but I get an error and nothing is printed.
>>> In the log I found the following stack trace:
>>>
>>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>>> from [10.0.3.1/10.0.3.1:54325]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>>> to [jpl-devvax/127.0.1.1:38767]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>>> [jpl-devvax/127.0.1.1:38767], 1 messages pending
>>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>> broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free:
>>> 265.4 MB)
>>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver
>>> for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
>>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>> at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$Z

Re: RDD.aggregate?

2014-12-11 Thread Gerard Maas
There's some explanation and an example here:
http://stackoverflow.com/questions/26611471/spark-data-processing-with-grouping/26612246#26612246

-kr, Gerard.

On Thu, Dec 11, 2014 at 7:15 PM, ll  wrote:

> any explaination on how aggregate works would be much appreciated.  i
> already
> looked at the spark example and still am confused about the seqop and
> combop... thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-aggregate-tp20434p20634.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: custom spark app name in yarn-cluster mode

2014-12-11 Thread Tomer Benyamini
On Thu, Dec 11, 2014 at 8:27 PM, Tomer Benyamini 
wrote:

> Hi,
>
> I'm trying to set a custom spark app name when running a java spark app in
> yarn-cluster mode.
>
>  SparkConf sparkConf = new SparkConf();
>
>  sparkConf.setMaster(System.getProperty("spark.master"));
>
>  sparkConf.setAppName("myCustomName");
>
>  sparkConf.set("spark.logConf", "true");
>
>  JavaSparkContext sc = new JavaSparkContext(sparkConf);
>
>
> Apparently this only works when running in yarn-client mode; in
> yarn-cluster mode the app name is the class name, when viewing the app in
> the cluster manager UI. Any idea?
>
>
> Thanks,
>
> Tomer
>
>
>


custom spark app name in yarn-cluster mode

2014-12-11 Thread Tomer Benyamini
Hi,

I'm trying to set a custom spark app name when running a java spark app in
yarn-cluster mode.

 SparkConf sparkConf = new SparkConf();

 sparkConf.setMaster(System.getProperty("spark.master"));

 sparkConf.setAppName("myCustomName");

 sparkConf.set("spark.logConf", "true");

 JavaSparkContext sc = new JavaSparkContext(sparkConf);


Apparently this only works when running in yarn-client mode; in
yarn-cluster mode the app name is the class name, when viewing the app in
the cluster manager UI. Any idea?


Thanks,

Tomer


why is spark + scala code so slow, compared to python?

2014-12-11 Thread ll
hi.. i'm converting some of my machine learning python code into scala +
spark.  i haven't been able to run it on large dataset yet, but on small
datasets (like http://yann.lecun.com/exdb/mnist/), my spark + scala code is
much slower than my python code (5 to 10 times slower than python)

i already tried everything to improve my spark + scala code like
broadcasting variables, caching the RDD, replacing all my matrix/vector
operations with breeze/blas, etc.  i saw some improvements, but it's still a
lot slower than my python code.

why is that?  

how do you improve your spark + scala performance today?  

or is spark + scala just not the right tool for small to medium datasets?

when would you use spark + scala vs. python?

thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: what is the best way to implement mini batches?

2014-12-11 Thread ll
any advice/comment on this would be much appreciated.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20635.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.aggregate?

2014-12-11 Thread ll
any explaination on how aggregate works would be much appreciated.  i already
looked at the spark example and still am confused about the seqop and
combop... thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-aggregate-tp20434p20634.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



broadcast: OutOfMemoryError

2014-12-11 Thread ll
hi.  i'm running into this OutOfMemory issue when i'm broadcasting a large
array.  what is the best way to handle this?

should i split the array into smaller arrays before broadcasting, and then
combining them locally at each node?

thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-OutOfMemoryError-tp20633.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Different Vertex Ids in Graph and Edges

2014-12-11 Thread th0rsten
Hello all,

I'm using GraphX (1.1.0) to process RDF-data. I want to build an graph out
of the data from the Berlin Benchmark ( BSBM
 
).
The steps that I'm doing to load the data into a graph are:

*1.* Split the RDF triples
*2.* Get all nodes (union subjects and objects and then distinct them,
/NodesRDD/)
*3.* Zip the nodes (NodesRDD) with "zipWithUniqueId" -> /ZippedNodesRDD/
*4.* Join the subjects and objects with the predicate to get the
corresponding ids for the nodes to build the edges
*5.* Build the graph nodes out of the /ZippedNodesRDD/, create the Java node
attribute
 *6.*Build the GraphX graph

My problem is that my nodes (/graph.vertices/) in the graph have different
ids than the nodes (/ZippedNodesRDD/) which I use to build the edges. I
don't know why because I build the final nodes out of the same RDD which I
use to join and this RDD is cached.

*For example:*
graph.vertices says: ID: 35255, Attribute:
bsbm-inst:dataFromVendor33/Offer62164/
ZippedNodesRDD says: ID: 35254 Attribute:
bsbm-inst:dataFromVendor33/Offer62164/

I have no idea why that happens, because the joining is correct only the ids
are wrong.


Thanks in Advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-Vertex-Ids-in-Graph-and-Edges-tp20632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Exception using amazonaws library

2014-12-11 Thread Albert Manyà
Hi,

I've made a simple script in scala that after doing a spark sql query it
sends the result to AWS's cloudwatch.

I've tested both parts individually (the spark sql one and the
cloudwatch one) and they worked fine. The trouble comes when I execute
the script through spark-submit that gives me the following exception:

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V
at

com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:95)
at
com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:193)
at

com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:120)
at

com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:104)
at

com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.(AmazonCloudWatchClient.java:171)
at

com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.(AmazonCloudWatchClient.java:152)
at

com.scmspain.synapse.SynapseMonitor$.sendMetrics(SynapseMonitor.scala:50)
at
com.scmspain.synapse.SynapseMonitor$.main(SynapseMonitor.scala:45)
at
com.scmspain.synapse.SynapseMonitor.main(SynapseMonitor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

my class is com.scmspain.synapse.SynapseMonitor

I've build my script with sbt assembly, having the following
dependencies:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.1.0" % "provided",
  "com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.9.10"
)

I've unzipped the generated jar assembly and searched for the
HttpConnectionParams.class and I've found it out under
org/apache/http/params and having the following signature for
setSoKeepalive:

public static void setSoKeepalive(HttpParams params, boolean
enableKeepalive)

At this point I'm stuck and didn't know where to keep looking... some
help would be greatly appreciated :)

Thank you very much!

-- 
  Albert Manyà
  alber...@eml.cc

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Akhil Das
Last time i did an sbt assembly and this is how i added the dependencies.


libraryDependencies ++= Seq(
  ("org.apache.spark" % "spark-streaming_2.10" % "1.1.0" % "provided").
exclude("org.eclipse.jetty.orbit", "javax.transaction").
exclude("org.eclipse.jetty.orbit", "javax.mail").
exclude("org.eclipse.jetty.orbit", "javax.activation").
exclude("com.esotericsoftware.minlog", "minlog").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-logging", "commons-logging").
exclude("commons-collections", "commons-collections").
exclude("org.eclipse.jetty.orbit", "javax.servlet")
)

libraryDependencies ++= Seq(
  ("org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0").
exclude("org.eclipse.jetty.orbit", "javax.transaction").
exclude("org.eclipse.jetty.orbit", "javax.mail").
exclude("org.eclipse.jetty.orbit", "javax.activation").
exclude("com.esotericsoftware.minlog", "minlog").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-logging", "commons-logging").
exclude("commons-collections", "commons-collections").
exclude("org.eclipse.jetty.orbit", "javax.servlet")
)


Those excluded were causing conflicts.

Thanks
Best Regards

On Thu, Dec 11, 2014 at 8:02 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

>  Thanks akhil for the answer.
>
> I am using sbt assembly and the build.sbt is in the first email. Do you
> know why those classes are included in that way?
>
>
> Thanks,
> Mario
>
>
> On 11.12.2014 14:51, Akhil Das wrote:
>
>  Yes. You can do/use *sbt assembly* and create a big fat jar with all
> dependencies bundled inside it.
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
> mario.pastore...@teralytics.ch> wrote:
>
>>  In this way it works but it's not portable and the idea of having a fat
>> jar is to avoid exactly this. Is there any system to create a
>> self-contained portable fatJar?
>>
>>
>> On 11.12.2014 13:57, Akhil Das wrote:
>>
>>  Add these jars while creating the Context.
>>
>> val sc = new SparkContext(conf)
>>
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
>> *spark-streaming-kafka_2.10-1.1.0.jar*")
>> sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
>> *zkclient-0.3.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
>> *metrics-core-2.2.0.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
>> *kafka_2.10-0.8.0.jar*")
>>
>> val ssc = new StreamingContext(sc, Seconds(10))
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
>> mario.pastore...@teralytics.ch> wrote:
>>
>>>  Hi,
>>>
>>> I'm trying to use spark-streaming with kafka but I get a strange error
>>> on class that are missing. I would like to ask if my way to build the fat
>>> jar is correct or no. My program is
>>>
>>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>>> kafkaGroupId, kafkaTopicsWithThreads)
>>> .map(_._2)
>>>
>>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>> iter:Iterator[CellWithLAC] =>
>>>   println("time: " ++ t.toString ++ " #received: " ++
>>> iter.size.toString)
>>> })
>>>
>>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>>> plugin) is
>>>
>>> name := "spark_example"
>>>
>>> version := "0.0.1"
>>>
>>> scalaVersion := "2.10.4"
>>>
>>> scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>>   "joda-time" % "joda-time" % "2.6"
>>> )
>>>
>>> assemblyMergeStrategy in assembly := {
>>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/commons/beanutils" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>>   case "plugin.properties" => MergeStrategy.discard
>>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>>   case x =>
>>> val oldStrategy = (assemblyMergeStrategy in assembly).value
>>> oldStrategy(x)
>>> }
>>>
>>> I create the jar with sbt assembly and the run with
>>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>>> test-consumer-group test1
>>>
>>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>>> program starts and keep running but I get an error and nothing is printed.
>>> In the log I found the following stack trace:
>>>
>>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>>> from [10.0.3.1/10.0.3.1:54325]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>>> to [j

RE: "Session" for connections?

2014-12-11 Thread Ashic Mahtab
Yeah, using that currently. Doing:

dstream.foreachRDD(x => x.foreachPartition(y => 
cassandraConnector.withSessionDo(session =>{
  val myHelper = MyHelper(session)
  y.foreach(m =>{
processMessage(m, myHelper)
 })
   }))

Is there a better approach?

From: gerard.m...@gmail.com
Date: Thu, 11 Dec 2014 15:35:31 +0100
Subject: Re: "Session" for connections?
To: as...@live.com
CC: tathagata.das1...@gmail.com; user@spark.apache.org

>I'm doing the same thing for using Cassandra,
For Cassandra, use the Spark-Cassandra connector [1], which does the Session 
management, as described by TD, for you.
[1] https://github.com/datastax/spark-cassandra-connector 
-kr, Gerard.
On Thu, Dec 11, 2014 at 1:55 PM, Ashic Mahtab  wrote:



That makes sense. I'll try that.

Thanks :)

> From: tathagata.das1...@gmail.com
> Date: Thu, 11 Dec 2014 04:53:01 -0800
> Subject: Re: "Session" for connections?
> To: as...@live.com
> CC: user@spark.apache.org
> 
> You could create a lazily initialized singleton factory and connection
> pool. Whenever an executor starts running the firt task that needs to
> push out data, it will create the connection pool as a singleton. And
> subsequent tasks running on the executor is going to use the
> connection pool. You will also have to intelligently shutdown the
> connections because there is not a obvious way to shut them down. You
> could have a usage timeout - shutdown connection after not being used
> for 10 x batch interval.
> 
> TD
> 
> On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab  wrote:
> > Hi,
> > I was wondering if there's any way of having long running session type
> > behaviour in spark. For example, let's say we're using Spark Streaming to
> > listen to a stream of events. Upon receiving an event, we process it, and if
> > certain conditions are met, we wish to send a message to rabbitmq. Now,
> > rabbit clients have the concept of a connection factory, from which you
> > create a connection, from which you create a channel. You use the channel to
> > get a queue, and finally the queue is what you publish messages on.
> >
> > Currently, what I'm doing can be summarised as :
> >
> > dstream.foreachRDD(x => x.forEachPartition(y => {
> >val factory = ..
> >val connection = ...
> >val channel = ...
> >val queue = channel.declareQueue(...);
> >
> >y.foreach(z => Processor.Process(z, queue));
> >
> >cleanup the queue stuff.
> > }));
> >
> > I'm doing the same thing for using Cassandra, etc. Now in these cases, the
> > session initiation is expensive, so foing it per message is not a good idea.
> > However, I can't find a way to say "hey...do this per worker once and only
> > once".
> >
> > Is there a better pattern to do this?
> >
> > Regards,
> > Ashic.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

  

Newbie Question

2014-12-11 Thread Fernando O.
Hi guys,
I'm planning to use spark on a project and I'm facing a problem, I
couldn't find a log that explains what's wrong with what I'm doing.

I have 2 vms that run a small hadoop (2.6.0) cluster. I added a file that
has a 50 lines of json data

Compiled spark, all tests passed, I run some simple scripts and then a yarn
example from the page:

./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
lib/spark-examples*.jar \
10

they all worked, so now It was time to start playing with yarn integration

I did
./bin/spark-shell  -Dspark-cores-max=1 -Dspark.executor.memory=128m
 --driver-memory 385m --executor-memory 385m --master yarn-client


then

*scala> val file = sc.textFile("hdfs:///logs/events.log")*
14/12/11 09:50:35 INFO storage.MemoryStore: ensureFreeSpace(82156) called
with curMem=0, maxMem=278302556
14/12/11 09:50:35 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 80.2 KB, free 265.3 MB)
14/12/11 09:50:35 INFO storage.MemoryStore: ensureFreeSpace(17472) called
with curMem=82156, maxMem=278302556
14/12/11 09:50:35 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 17.1 KB, free 265.3 MB)
14/12/11 09:50:35 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on 172.17.42.1:53158 (size: 17.1 KB, free: 265.4 MB)
14/12/11 09:50:35 INFO storage.BlockManagerMaster: Updated info of block
broadcast_0_piece0
file: org.apache.spark.rdd.RDD[String] = hdfs:///logs/events.log
MappedRDD[1] at textFile at :12

*scala> val newsletters = file.filter(line =>
line.contains("newsletter_id"))*
newsletters: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at
:14

*scala> newsletters.count()*
14/12/11 09:50:47 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/12/11 09:50:47 INFO spark.SparkContext: Starting job: count at
:17
14/12/11 09:50:47 INFO scheduler.DAGScheduler: Got job 0 (count at
:17) with 2 output partitions (allowLocal=false)
14/12/11 09:50:47 INFO scheduler.DAGScheduler: Final stage: Stage 0(count
at :17)
14/12/11 09:50:47 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/12/11 09:50:47 INFO scheduler.DAGScheduler: Missing parents: List()
14/12/11 09:50:47 INFO scheduler.DAGScheduler: Submitting Stage 0
(FilteredRDD[2] at filter at :14), which has no missing parents
14/12/11 09:50:47 INFO storage.MemoryStore: ensureFreeSpace(2688) called
with curMem=99628, maxMem=278302556
14/12/11 09:50:47 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 2.6 KB, free 265.3 MB)
14/12/11 09:50:47 INFO storage.MemoryStore: ensureFreeSpace(1687) called
with curMem=102316, maxMem=278302556
14/12/11 09:50:47 INFO storage.MemoryStore: Block broadcast_1_piece0 stored
as bytes in memory (estimated size 1687.0 B, free 265.3 MB)
14/12/11 09:50:47 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on 172.17.42.1:53158 (size: 1687.0 B, free: 265.4 MB)
14/12/11 09:50:47 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece0
14/12/11 09:50:47 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 0 (FilteredRDD[2] at filter at :14)
14/12/11 09:50:47 INFO cluster.YarnClientClusterScheduler: Adding task set
0.0 with 2 tasks
14/12/11 09:50:47 INFO util.RackResolver: Resolved hdfs1 to /default-rack
14/12/11 09:51:02 WARN cluster.YarnClientClusterScheduler: Initial job has
not accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient memory
14/12/11 09:51:17 WARN cluster.YarnClientClusterScheduler: Initial job has
not accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient memory


On the node executing

The spark console shows the task:


Active Stages (1)Stage IdDescriptionSubmittedDurationTasks: Succeeded/Total
InputShuffle ReadShuffle Write0(kill
)count at
:17 +details

2014/12/11 10:18:255.8 min
0/2


In the Hadoop node executing the task:
NodeManager informationTotal Vmem allocated for Containers3.10 GBVmem
enforcement enabledtrueTotal Pmem allocated for Container1.48 GBPmem
enforcement enabledtrueTotal VCores allocated for Containers1
NodeHealthyStatustrueLastNodeHealthTimeThu Dec 11 10:17:49 ART 2014
NodeHealthReportNode Manager Version:2.6.0 from
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 by jenkins source checksum
7e1415f8c555842b6118a192d86f5e8 on 2014-11-13T21:17ZHadoop Version:2.6.0
from e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 by jenkins source checksum
18e43357c8f927c0695f1e9522859d6a on 2014-11-13T21:10Z




Cluster MetricsApps SubmittedApps PendingApps RunningApps CompletedContainers
RunningMemory UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores
Res

Re: Error outputing to CSV file

2014-12-11 Thread Muhammad Ahsan
Hi 

saveAsTextFile is a member of RDD where as
fields.map(_.mkString("|")).mkString("\n") is a string. You have to
transform it into RDD using something like sc.parallel(...) before 
saveAsTextFile. 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-outputing-to-CSV-file-tp20583p20629.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: parquet file not loading (spark v 1.1.0)

2014-12-11 Thread Muhammad Ahsan
Hi 
It worked for me like this. Just define the case class outside of any class
to write to parquet format successfully. I am using Spark version 1.1.1.

case class person(id: Int, name: String, fathername: String, officeid: Int)
object Program {
def main (args: Array[String]) {

val conf: SparkConf = new
SparkConf().setAppName("Test").setMaster("local")
val sc: SparkContext = new SparkContext(conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
val baseperson: RDD[Array[String]] =
sc.textFile("/home/ahsan/Desktop/1.csv").flatMap(line =>
line.split("\n")).map(_.split(","))
val x: RDD[person] = baseperson.map(p =>
person(p(0).asInstanceOf[Int], p(1).asInstanceOf[String],
p(2).asInstanceOf[String], p(3).asInstanceOf[Int]))
x.saveAsParquetFile("/home/ahsan/Desktop/pqt")

}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parquet-file-not-loading-spark-v-1-1-0-tp20618p20628.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-SQL JDBC driver

2014-12-11 Thread Denny Lee
Yes, that is correct. A quick reference on this is the post
https://www.linkedin.com/pulse/20141007143323-732459-an-absolutely-unofficial-way-to-connect-tableau-to-sparksql-spark-1-1?_mSplash=1
with the pertinent section being:

It is important to note that when you create Spark tables (for example, via
the .registerTempTable) these are operating within the Spark environment
which resides in a separate process than the Hive Metastore. This means
that currently tables that are created within the Spark context are not
available through the Thrift server. To achieve this, within the Spark
context save your temporary table into Hive - then the Spark Thrift Server
will be able to see the table.

HTH!

On Thu, Dec 11, 2014 at 04:09 Anas Mosaad  wrote:

> Actually I came to a conclusion that RDDs has to be persisted in hive in
> order to be able to access through thrift.
> Hope I didn't end up with incorrect conclusion.
> Please someone correct me if I am wrong.
> On Dec 11, 2014 8:53 AM, "Judy Nash" 
> wrote:
>
>>  Looks like you are wondering why you cannot see the RDD table you have
>> created via thrift?
>>
>>
>>
>> Based on my own experience with spark 1.1, RDD created directly via Spark
>> SQL (i.e. Spark Shell or Spark-SQL.sh) is not visible on thrift, since
>> thrift has its own session containing its own RDD.
>>
>> Spark SQL experts on the forum can confirm on this though.
>>
>>
>>
>> *From:* Cheng Lian [mailto:lian.cs@gmail.com]
>> *Sent:* Tuesday, December 9, 2014 6:42 AM
>> *To:* Anas Mosaad
>> *Cc:* Judy Nash; user@spark.apache.org
>> *Subject:* Re: Spark-SQL JDBC driver
>>
>>
>>
>> According to the stacktrace, you were still using SQLContext rather than
>> HiveContext. To interact with Hive, HiveContext *must* be used.
>>
>> Please refer to this page
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
>>
>>  On 12/9/14 6:26 PM, Anas Mosaad wrote:
>>
>>  Back to the first question, this will mandate that hive is up and
>> running?
>>
>>
>>
>> When I try it, I get the following exception. The documentation says that
>> this method works only on SchemaRDD. I though that countries.saveAsTable
>> did not work for that a reason so I created a tmp that contains the results
>> from the registered temp table. Which I could validate that it's a
>> SchemaRDD as shown below.
>>
>>
>>
>>
>> * @Judy,* I do really appreciate your kind support and I want to
>> understand and off course don't want to wast your time. If you can direct
>> me the documentation describing this details, this will be great.
>>
>>
>>
>> scala> val tmp = sqlContext.sql("select * from countries")
>>
>> tmp: org.apache.spark.sql.SchemaRDD =
>>
>> SchemaRDD[12] at RDD at SchemaRDD.scala:108
>>
>> == Query Plan ==
>>
>> == Physical Plan ==
>>
>> PhysicalRDD
>> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29],
>> MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36
>>
>>
>>
>> scala> tmp.saveAsTable("Countries")
>>
>> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
>> Unresolved plan found, tree:
>>
>> 'CreateTableAsSelect None, Countries, false, None
>>
>>  Project
>> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29]
>>
>>   Subquery countries
>>
>>LogicalRDD
>> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29],
>> MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36
>>
>>
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
>>
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>>
>> at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
>>
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply

Re: Key not valid / already cancelled using Spark Streaming

2014-12-11 Thread Flávio Santos
Hello guys,

Thank you for your prompt reply.
I followed Akhil suggestion with no success. Then, I tried again replacing
S3 by HDFS and the job seems to work properly.
TD, I'm not using speculative execution.

I think I've just realized what is happening. Due to S3 eventual
consistency, these temporary files sometimes are found, sometimes they are
not. I confirmed this hypothesis via s3cmd.
So, I come up with two questions/suggestions:

1. Does Spark support these temporary files to be written on HDFS and my
final output on S3?
2. What do you think about adding a property like 'spark.s3.maxRetries'
that determines the number of retries before assuming that a file is indeed
not found? I can contribute with this patch if you want.
(Hadoop already have a similar property 'fs.s3.maxRetries', but for
IOException and S3Exception.)

Thanks again and I look forward for your comments,


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br *
+55 48 3232.3200

On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das  wrote:

> Following Gerard's thoughts, here are possible things that could be
> happening.
>
> 1. Is there another process in the background that is deleting files
> in the directory where you are trying to write? Seems like the
> temporary file generated by one of the tasks is getting delete before
> it is renamed to the final output file. I suggest trying to not write
> to S3, rather just count and print (with rest of the computation
> staying exactly same) and see if the error still occurs. That would
> narrow down the culprit to what Gerard suggested.
> 2. Do you have speculative execution turned on? If so, could you turn
> it off and try?
>
> TD
>
> On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas 
> wrote:
> > If the timestamps in the logs are to be trusted It looks like your
> driver is
> > dying with that java.io.FileNotFoundException: and therefore the workers
> > loose their connection and close down.
> >
> > -kr, Gerard.
> >
> > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das 
> > wrote:
> >>
> >> Try to add the following to the sparkConf
> >>
> >>  .set("spark.core.connection.ack.wait.timeout","6000")
> >>
> >>   .set("spark.akka.frameSize","60")
> >>
> >> Used to face that issue with spark 1.1.0
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
> >>  wrote:
> >>>
> >>> Dear Spark'ers,
> >>>
> >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1)
> and
> >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In
> short, my
> >>> job does the following:
> >>> - Consumes a specific Kafka topic
> >>> - Writes its content to S3 or HDFS
> >>>
> >>> Records in Kafka are in the form:
> >>> {"key": "someString"}
> >>>
> >>> This is important because I use the value of "key" to define the output
> >>> file name in S3.
> >>> Here are the Spark and Kafka parameters I'm using:
> >>>
>  val sparkConf = new SparkConf()
>    .setAppName("MyDumperApp")
>    .set("spark.task.maxFailures", "100")
>    .set("spark.hadoop.validateOutputSpecs", "false")
>    .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>    .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>  val kafkaParams = Map(
>    "zookeeper.connect" -> zkQuorum,
>    "zookeeper.session.timeout.ms" -> "1",
>    "rebalance.backoff.ms" -> "8000",
>    "rebalance.max.retries" -> "10",
>    "group.id" -> group,
>    "auto.offset.reset" -> "largest"
>  )
> >>>
> >>>
> >>> My application is the following:
> >>>
>  KafkaUtils.createStream[String, String, StringDecoder,
>  StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>  StorageLevel.MEMORY_AND_DISK_SER_2)
>    .foreachRDD((rdd, time) =>
>  rdd.map {
>    case (_, line) =>
>  val json = parse(line)
>  val key = extract(json, "key").getOrElse("key_not_found")
>  (key, dateFormatter.format(time.milliseconds)) -> line
>  }
>    .partitionBy(new HashPartitioner(10))
>    .saveAsHadoopFile[KeyBasedOutput[(String,String),
>  String]]("s3://BUCKET", classOf[BZip2Codec])
>    )
> >>>
> >>>
> >>> And the last piece:
> >>>
>  class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>  MultipleTextOutputFormat[T , V] {
>    override protected def generateFileNameForKeyValue(key: T, value: V,
>  leaf: String) = key match {
>  case (myKey, batchId) =>
>    "somedir" + "/" + myKey + "/" +
>  "prefix-" + myKey + "_" + batchId + "_" + leaf
>    }
>    override protected def generateActualKey(key: T, value: V) = null
>  }
> >>>
> >>>
> >>> I use batch sizes of 5 minutes with checkpoints activated.
> >>> The job fails nondeterministically (I think it never ran longer than ~5
> >>> hours). I have no clue why, it simply fails.
> >>> Please find below the exceptions thrown by my app

Re: Specifying number of executors in Mesos

2014-12-11 Thread Tathagata Das
Not that I am aware of. Spark will try to spread the tasks evenly
across executors, its not aware of the workers at all. So if the
executors to worker allocation is uneven, I am not sure what can be
done. Maybe others can get smoe ideas.

On Tue, Dec 9, 2014 at 6:20 AM, Gerard Maas  wrote:
> Hi,
>
> We've a number of Spark Streaming /Kafka jobs that would benefit of an even
> spread of consumers over physical hosts in order to maximize network usage.
> As far as I can see, the Spark Mesos scheduler accepts resource offers until
> all required Mem + CPU allocation has been satisfied.
>
> This basic resource allocation policy results in large executors spread over
> few nodes, resulting in many Kafka consumers in a single node (e.g. from 12
> consumers, I've seen allocations of 7/3/2)
>
> Is there a way to tune this behavior to achieve executor allocation on a
> given number of hosts?
>
> -kr, Gerard.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there an efficient way to append new data to a registered Spark SQL Table?

2014-12-11 Thread Tathagata Das
First of all, how long do you want to keep doing this? The data is
going to increase infinitely and without any bounds, its going to get
too big for any cluster to handle. If all that is within bounds, then
try the following.

- Maintain a global variable having the current RDD storing all the
log data. We are going to keep updating this variable.
- Every batch interval, take new data and union it with the earlier
unified RDD (in the global variable) and update the global variable.
If you want sequel queries on this data, then you will have
re-register this new RDD as the named table.
- With this approach the number of partitions is going to increase
rapidly. So periodically take the unified RDD and repartition it to a
smaller set of partitions. This messes up the ordering of data, but
you maybe fine with if your queries are order agnostic. Also,
periodically, checkpoint this RDD, otherwise the lineage is going to
grow indefinitely and everything will start getting slower.

Hope this helps.

TD

On Mon, Dec 8, 2014 at 6:29 PM, Xuelin Cao  wrote:
>
> Hi,
>
>   I'm wondering whether there is an  efficient way to continuously
> append new data to a registered spark SQL table.
>
>   This is what I want:
>   I want to make an ad-hoc query service to a json formated system log.
> Certainly, the system log is continuously generated. I will use spark
> streaming to connect the system log as my input, and I want to find a way to
> effectively append the new data into an existed spark SQL table. Further
> more, I want the whole table being cached in memory/tachyon.
>
>   It looks like spark sql supports the "INSERT" method, but only for
> parquet file. In addition, it is inefficient to insert a single row every
> time.
>
>   I do know that somebody build a similar system that I want (ad-hoc
> query service to a on growing system log). So, there must be an efficient
> way. Anyone knows?
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Locking for shared RDDs

2014-12-11 Thread Tathagata Das
Aditya, I think you have the mental model of spark streaming a little
off the mark. Unlike traditional streaming systems, where any kind of
state is mutable, SparkStreaming is designed on Sparks immutable RDDs.
Streaming data is received and divided into immutable blocks, then
form immutable RDDs, and then transformations form new immutable RDDs.
Its best that you first read the Spark paper and then the Spark
Streaming paper to under the model. Once you understand that, you will
realize that since everything is immutable, the question of
consistency does not even arise :)

TD

On Mon, Dec 8, 2014 at 9:44 PM, Raghavendra Pandey
 wrote:
> You don't need to worry about locks as such as one thread/worker is
> responsible exclusively for one partition of the RDD. You can use
> Accumulator variables that spark provides to get the state updates.
>
>
> On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye 
> wrote:
>>
>> I am relatively new to Spark. I am planning to use Spark Streaming for my
>> OLAP use case, but I would like to know how RDDs are shared between
>> multiple
>> workers.
>> If I need to constantly compute some stats on the streaming data,
>> presumably
>> shared state would have to updated serially by different spark workers. Is
>> this managed by Spark automatically or does the application need to ensure
>> distributed locks are acquired?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Trouble with cache() and parquet

2014-12-11 Thread Yana Kadiyska
I see -- they are the same in design but  the difference comes from
partitioned Hive tables: when the RDD is generated by querying an external
Hive metastore, the partition is appended as part of the row, and shows up
as part of the schema. Can you shed some light on why this is a problem:

last2HourRdd.first <-- works ok
last2HourRdd.cache()

last2HourRdd.first <-- does not work

​

The first call shows K+1 columns (and so does print schema, where K columns
are from the backing parquet files and the K+1st is the partition inlined.
My impression is that the second call to .first would just force the
cache() call and dump out that RDD to disk (with all of it's K+1 columns
and store the schema info, again with K+1 columns), and then just return a
single entry. I am not sure why the fact that Hive metastore exposes an
extra column over the raw parquet file is a problem since it does so both
on the schema and in the data: last2HourRdd.schema.fields.length reports
K+1, and so does  last2HourRdd.first.length.

I also tried
calling sqlContext.applySchema(last2HourRdd,parquetFile.schema) before
caching but it does not fix the issue. The only workaround I've come up
with so far is to replace select * with a select . But I'd
love to understand a little better why the cache call trips this scenario



On Wed, Dec 10, 2014 at 3:50 PM, Michael Armbrust 
wrote:

> Have you checked to make sure the schema in the metastore matches the
> schema in the parquet file?  One way to test would be to just use
> sqlContext.parquetFile(...) which infers the schema from the file instead
> of using the metastore.
>
> On Wed, Dec 10, 2014 at 12:46 PM, Yana Kadiyska 
> wrote:
>
>>
>> Hi folks, wondering if anyone has thoughts. Trying to create something
>> akin to a materialized view (sqlContext is a HiveContext connected to
>> external metastore):
>>
>>
>> val last2HourRdd = sqlContext.sql(s"select * from mytable")
>> //last2HourRdd.first prints out a  org.apache.spark.sql.Row = [...] with
>> valid data
>>
>>  last2HourRdd.cache()
>> //last2HourRdd.first now fails in an executor with the following:
>>
>> In the driver:
>>
>> 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 
>> 35, iphere, NODE_LOCAL, 2170 bytes)
>> 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) 
>> on executor iphere: java.lang.ClassCastException (null) [duplicate 1]
>>
>> ​
>>
>>
>> And in executor:
>>
>> 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 
>> 27)
>> java.lang.ClassCastException: java.lang.String cannot be cast to 
>> java.lang.Integer
>>  at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236)
>>  at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328)
>>  at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310)
>>  at 
>> org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168)
>>  at 
>> org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37)
>>  at 
>> org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64)
>>  at 
>> org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54)
>>  at 
>> org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64)
>>  at 
>> org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52)
>>  at 
>> org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275)
>>  at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>  at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>  at 
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>  at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>  at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>  at scala.collection.TraversableOnce$class.to(Traversable

Re: "Session" for connections?

2014-12-11 Thread Gerard Maas
>I'm doing the same thing for using Cassandra,

For Cassandra, use the Spark-Cassandra connector [1], which does the
Session management, as described by TD, for you.

[1] https://github.com/datastax/spark-cassandra-connector

-kr, Gerard.

On Thu, Dec 11, 2014 at 1:55 PM, Ashic Mahtab  wrote:

> That makes sense. I'll try that.
>
> Thanks :)
>
> > From: tathagata.das1...@gmail.com
> > Date: Thu, 11 Dec 2014 04:53:01 -0800
> > Subject: Re: "Session" for connections?
> > To: as...@live.com
> > CC: user@spark.apache.org
>
> >
> > You could create a lazily initialized singleton factory and connection
> > pool. Whenever an executor starts running the firt task that needs to
> > push out data, it will create the connection pool as a singleton. And
> > subsequent tasks running on the executor is going to use the
> > connection pool. You will also have to intelligently shutdown the
> > connections because there is not a obvious way to shut them down. You
> > could have a usage timeout - shutdown connection after not being used
> > for 10 x batch interval.
> >
> > TD
> >
> > On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab  wrote:
> > > Hi,
> > > I was wondering if there's any way of having long running session type
> > > behaviour in spark. For example, let's say we're using Spark Streaming
> to
> > > listen to a stream of events. Upon receiving an event, we process it,
> and if
> > > certain conditions are met, we wish to send a message to rabbitmq. Now,
> > > rabbit clients have the concept of a connection factory, from which you
> > > create a connection, from which you create a channel. You use the
> channel to
> > > get a queue, and finally the queue is what you publish messages on.
> > >
> > > Currently, what I'm doing can be summarised as :
> > >
> > > dstream.foreachRDD(x => x.forEachPartition(y => {
> > > val factory = ..
> > > val connection = ...
> > > val channel = ...
> > > val queue = channel.declareQueue(...);
> > >
> > > y.foreach(z => Processor.Process(z, queue));
> > >
> > > cleanup the queue stuff.
> > > }));
> > >
> > > I'm doing the same thing for using Cassandra, etc. Now in these cases,
> the
> > > session initiation is expensive, so foing it per message is not a good
> idea.
> > > However, I can't find a way to say "hey...do this per worker once and
> only
> > > once".
> > >
> > > Is there a better pattern to do this?
> > >
> > > Regards,
> > > Ashic.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: "Session" for connections?

2014-12-11 Thread Tathagata Das
Also, this is covered in the streaming programming guide in bits and pieces.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Thu, Dec 11, 2014 at 4:55 AM, Ashic Mahtab  wrote:
> That makes sense. I'll try that.
>
> Thanks :)
>
>> From: tathagata.das1...@gmail.com
>> Date: Thu, 11 Dec 2014 04:53:01 -0800
>> Subject: Re: "Session" for connections?
>> To: as...@live.com
>> CC: user@spark.apache.org
>
>>
>> You could create a lazily initialized singleton factory and connection
>> pool. Whenever an executor starts running the firt task that needs to
>> push out data, it will create the connection pool as a singleton. And
>> subsequent tasks running on the executor is going to use the
>> connection pool. You will also have to intelligently shutdown the
>> connections because there is not a obvious way to shut them down. You
>> could have a usage timeout - shutdown connection after not being used
>> for 10 x batch interval.
>>
>> TD
>>
>> On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab  wrote:
>> > Hi,
>> > I was wondering if there's any way of having long running session type
>> > behaviour in spark. For example, let's say we're using Spark Streaming
>> > to
>> > listen to a stream of events. Upon receiving an event, we process it,
>> > and if
>> > certain conditions are met, we wish to send a message to rabbitmq. Now,
>> > rabbit clients have the concept of a connection factory, from which you
>> > create a connection, from which you create a channel. You use the
>> > channel to
>> > get a queue, and finally the queue is what you publish messages on.
>> >
>> > Currently, what I'm doing can be summarised as :
>> >
>> > dstream.foreachRDD(x => x.forEachPartition(y => {
>> > val factory = ..
>> > val connection = ...
>> > val channel = ...
>> > val queue = channel.declareQueue(...);
>> >
>> > y.foreach(z => Processor.Process(z, queue));
>> >
>> > cleanup the queue stuff.
>> > }));
>> >
>> > I'm doing the same thing for using Cassandra, etc. Now in these cases,
>> > the
>> > session initiation is expensive, so foing it per message is not a good
>> > idea.
>> > However, I can't find a way to say "hey...do this per worker once and
>> > only
>> > once".
>> >
>> > Is there a better pattern to do this?
>> >
>> > Regards,
>> > Ashic.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Mario Pastorelli

Thanks akhil for the answer.

I am using sbt assembly and the build.sbt is in the first email. Do you 
know why those classes are included in that way?



Thanks,
Mario

On 11.12.2014 14:51, Akhil Das wrote:
Yes. You can do/use *sbt assembly* and create a big fat jar with all 
dependencies bundled inside it.


Thanks
Best Regards

On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli 
> wrote:


In this way it works but it's not portable and the idea of having
a fat jar is to avoid exactly this. Is there any system to create
a self-contained portable fatJar?


On 11.12.2014 13:57, Akhil Das wrote:

Add these jars while creating the Context.

   val sc = new SparkContext(conf)


sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")

sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")

sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")

sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
val ssc = new StreamingContext(sc, Seconds(10))


Thanks
Best Regards

On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli
mailto:mario.pastore...@teralytics.ch>> wrote:

Hi,

I'm trying to use spark-streaming with kafka but I get a
strange error on class that are missing. I would like to ask
if my way to build the fat jar is correct or no. My program is

val kafkaStream = KafkaUtils.createStream(ssc,
zookeeperQuorum, kafkaGroupId, kafkaTopicsWithThreads)
.map(_._2)

kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
iter:Iterator[CellWithLAC] =>
  println("time: " ++ t.toString ++ " #received: " ++
iter.size.toString)
})

I use sbt to manage my project and my build.sbt (with
assembly 0.12.0 plugin) is

name := "spark_example"

version := "0.0.1"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-deprecation","-feature")

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
  "joda-time" % "joda-time" % "2.6"
)

assemblyMergeStrategy in assembly := {
  case p if p startsWith "com/esotericsoftware/minlog" =>
MergeStrategy.first
  case p if p startsWith "org/apache/commons/beanutils" =>
MergeStrategy.first
  case p if p startsWith "org/apache/" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.discard
  case p if p startsWith "META-INF" => MergeStrategy.discard
  case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}

I create the jar with sbt assembly and the run with
$SPARK_HOME/bin/spark-submit --master spark://master:7077
--class Main
target/scala-2.10/spark_example-assembly-0.0.1.jar
localhost:2181 test-consumer-group test1

where master:7077 is the spark master, localhost:2181 is
zookeeper, test-consumer-group is kafka groupid and test1 is
the kafka topic. The program starts and keep running but I
get an error and nothing is printed. In the log I found the
following stack trace:

14/12/11 13:02:08 INFO network.ConnectionManager: Accepted
connection from [10.0.3.1/10.0.3.1:54325
]
14/12/11 13:02:08 INFO network.SendingConnection: Initiating
connection to [jpl-devvax/127.0.1.1:38767
]
14/12/11 13:02:08 INFO network.SendingConnection: Connected
to [jpl-devvax/127.0.1.1:38767 ], 1
messages pending
14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0
B, free: 265.4 MB)
14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered
receiver for stream 0 from
akka.tcp://sparkExecutor@jpl-devvax:46602
14/12/11 13:02:08 ERROR scheduler.ReceiverTracker:
Deregistered receiver for stream 0: Error starting receiver 0
- java.lang.NoClassDefFoundError:

kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
at

kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
Source)
at

kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply

Re: Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Akhil Das
Yes. You can do/use *sbt assembly* and create a big fat jar with all
dependencies bundled inside it.

Thanks
Best Regards

On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

>  In this way it works but it's not portable and the idea of having a fat
> jar is to avoid exactly this. Is there any system to create a
> self-contained portable fatJar?
>
>
> On 11.12.2014 13:57, Akhil Das wrote:
>
>  Add these jars while creating the Context.
>
> val sc = new SparkContext(conf)
>
>
> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
> *spark-streaming-kafka_2.10-1.1.0.jar*")
> sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
> *zkclient-0.3.jar*")
>
> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
> *metrics-core-2.2.0.jar*")
>
> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
> *kafka_2.10-0.8.0.jar*")
>
> val ssc = new StreamingContext(sc, Seconds(10))
>
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
> mario.pastore...@teralytics.ch> wrote:
>
>>  Hi,
>>
>> I'm trying to use spark-streaming with kafka but I get a strange error on
>> class that are missing. I would like to ask if my way to build the fat jar
>> is correct or no. My program is
>>
>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>> kafkaGroupId, kafkaTopicsWithThreads)
>> .map(_._2)
>>
>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>> iter:Iterator[CellWithLAC] =>
>>   println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
>> })
>>
>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>> plugin) is
>>
>> name := "spark_example"
>>
>> version := "0.0.1"
>>
>> scalaVersion := "2.10.4"
>>
>> scalacOptions ++= Seq("-deprecation","-feature")
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>   "joda-time" % "joda-time" % "2.6"
>> )
>>
>> assemblyMergeStrategy in assembly := {
>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>> MergeStrategy.first
>>   case p if p startsWith "org/apache/commons/beanutils" =>
>> MergeStrategy.first
>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>   case "plugin.properties" => MergeStrategy.discard
>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>   case x =>
>> val oldStrategy = (assemblyMergeStrategy in assembly).value
>> oldStrategy(x)
>> }
>>
>> I create the jar with sbt assembly and the run with
>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>> test-consumer-group test1
>>
>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>> program starts and keep running but I get an error and nothing is printed.
>> In the log I found the following stack trace:
>>
>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>> from [10.0.3.1/10.0.3.1:54325]
>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>> to [jpl-devvax/127.0.1.1:38767]
>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>> [jpl-devvax/127.0.1.1:38767], 1 messages pending
>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
>> in memory on jpl-devvax:38767 (size: 842.0 B, free: 265.4 MB)
>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver for
>> stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>> at
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>> Source)
>> at
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>> Source)
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> at
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>> Source)
>> at
>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>> Source)
>> at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>> at
>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>> Source)
>> at
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>> at
>> org.apache.spark.

Re: Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Mario Pastorelli
In this way it works but it's not portable and the idea of having a fat 
jar is to avoid exactly this. Is there any system to create a 
self-contained portable fatJar?


On 11.12.2014 13:57, Akhil Das wrote:

Add these jars while creating the Context.

   val sc = new SparkContext(conf)

sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")
sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")
sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")
sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
val ssc = new StreamingContext(sc, Seconds(10))


Thanks
Best Regards

On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli 
> wrote:


Hi,

I'm trying to use spark-streaming with kafka but I get a strange
error on class that are missing. I would like to ask if my way to
build the fat jar is correct or no. My program is

val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
kafkaGroupId, kafkaTopicsWithThreads)
.map(_._2)

kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
iter:Iterator[CellWithLAC] =>
  println("time: " ++ t.toString ++ " #received: " ++
iter.size.toString)
})

I use sbt to manage my project and my build.sbt (with assembly
0.12.0 plugin) is

name := "spark_example"

version := "0.0.1"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-deprecation","-feature")

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
  "joda-time" % "joda-time" % "2.6"
)

assemblyMergeStrategy in assembly := {
  case p if p startsWith "com/esotericsoftware/minlog" =>
MergeStrategy.first
  case p if p startsWith "org/apache/commons/beanutils" =>
MergeStrategy.first
  case p if p startsWith "org/apache/" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.discard
  case p if p startsWith "META-INF" => MergeStrategy.discard
  case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}

I create the jar with sbt assembly and the run with
$SPARK_HOME/bin/spark-submit --master spark://master:7077 --class
Main target/scala-2.10/spark_example-assembly-0.0.1.jar
localhost:2181 test-consumer-group test1

where master:7077 is the spark master, localhost:2181 is
zookeeper, test-consumer-group is kafka groupid and test1 is the
kafka topic. The program starts and keep running but I get an
error and nothing is printed. In the log I found the following
stack trace:

14/12/11 13:02:08 INFO network.ConnectionManager: Accepted
connection from [10.0.3.1/10.0.3.1:54325
]
14/12/11 13:02:08 INFO network.SendingConnection: Initiating
connection to [jpl-devvax/127.0.1.1:38767 ]
14/12/11 13:02:08 INFO network.SendingConnection: Connected to
[jpl-devvax/127.0.1.1:38767 ], 1 messages
pending
14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B,
free: 265.4 MB)
14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered
receiver for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered
receiver for stream 0: Error starting receiver 0 -
java.lang.NoClassDefFoundError:

kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
at

kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
Source)
at

kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
Source)
at
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at

kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
Source)
at

kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown
Source)
at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
Source)
at

org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
at

org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at

org.ap

Standalone app: IOException due to broadcast.destroy()

2014-12-11 Thread Alberto Garcia
Hello.

I'm pretty new with Spark
I am developing an Spark application, conducting the test on local prior to
deploy it on a cluster. I have a problem with a broacast variable. The
application raises 

"Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task serialization failed: java.io.IOException: unexpected
exception type" 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:994)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
...
...
if I try to destroy that broacast (tried both synchronous and asynchronous
modes).

The application follows an iterative pattern:

-Create accumulator

-for i=01000
   -foreach() --> add to the accumulator
   -broacast(accumulator.value())
   - 
   -use broadcasted value
   -
   -broacast.destroy()
   -accumulator.setValue(zeroValue)
endfor

The exception raises in i=1 in foreach line. If I comment the destruction
line, the application reaches more than 200 iterations before failing (but
that's another issue).

Where could be my mistake?

Spark 1.1.0
Hadoop 2.2.0
java version "1.8.0_25"
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.25-b02, mixed mode)

If you need more info please ask me.

Thank for the help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-app-IOException-due-to-broadcast-destroy-tp20627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark job have sideeffects (write files to FileSystem)

2014-12-11 Thread Daniel Darabos
Yes, this is perfectly "legal". This is what RDD.foreach() is for! You may
be encountering an IO exception while writing, and maybe using() suppresses
it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd
expect there is less that can go wrong with that simple call.

On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc  wrote:

> Imagine simple Spark job, that will store each line of the RDD to a
> separate file
>
>
> val lines = sc.parallelize(1 to 100).map(n => s"this is line $n")
> lines.foreach(line => writeToFile(line))
>
> def writeToFile(line: String) = {
> def filePath = "file://..."
> val file = new File(new URI(path).getPath)
> // using function simply closes the output stream
> using(new FileOutputStream(file)) { output =>
>   output.write(value)
> }
> }
>
>
> Now, example above works 99,9% of a time. Files are generated for each
> line, each file contains that particular line.
>
> However, when dealing with large number of data, we encounter situations
> where some of the files are empty! Files are generated, but there is no
> content inside of them (0 bytes).
>
> Now the question is: can Spark job have side effects. Is it even legal to
> write such code?
> If no, then what other choice do we have when we want to save data from
> our RDD?
> If yes, then do you guys see what could be the reason of this job acting
> in this strange manner 0.1% of the time?
>
>
> disclaimer: we are fully aware of .saveAsTextFile method in the API,
> however the example above is a simplification of our code - normally we
> produce PDF files.
>
>
> Best regards,
> Paweł Szulc
>
>
>
>
>
>
>


Spark SQL Vs CQL performance on Cassandra

2014-12-11 Thread Ajay
Hi,

To test Spark SQL Vs CQL performance on Cassandra, I did the following:

1) Cassandra standalone server (1 server in a cluster)
2) Spark Master and 1 Worker
Both running in a Thinkpad laptop with 4 cores and 8GB RAM.
3) Written Spark SQL code using Cassandra-Spark Driver from Cassandra
(JavaApiDemo.java. Run with spark://127.0.0.1:7077 127.0.0.1)
4) Writen CQL code using Java driver from Cassandra
(CassandraJavaApiDemo.java)
In both the case, I create 1 millions rows and query for 1

Observation:
1) It takes less than 10 milliseconds using CQL (SELECT * FROM users WHERE
name='Anna')
2) It takes around .6 second using Spark (either SELECT * FROM users WHERE
name='Anna' or javaFunctions(sc).cassandraTable("test", "people",
mapRowTo(Person.class)).where("name=?", "Anna");

Please let me know if I am missing something in Spark configuration or
Cassandra-Spark Driver.

Thanks
Ajay Garga
package com.datastax.demo;

import java.text.SimpleDateFormat;
import java.util.Date;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.QueryTrace;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;

public class CassandraJavaApiDemo {
	private static SimpleDateFormat format = new SimpleDateFormat(
			"HH:mm:ss.SSS");

	public static void main(String[] args) {
		Cluster cluster = null;
		Session session = null;

		try {
			cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
			session = cluster.connect();

			session.execute("DROP KEYSPACE IF EXISTS test2");
			session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
			session.execute("CREATE TABLE test2.users (id INT, name TEXT, birth_date  TIMESTAMP, PRIMARY KEY (id) )");
			session.execute("CREATE INDEX people_name_idx2 ON test2.users(name)");

			session = cluster.connect("test2");
			Statement insert = null;
			for (int i = 0; i < 100; i++) {
insert = QueryBuilder.insertInto("users").value("id", i)
		.value("name", "Anna" + i)
		.value("birth_date", new Date());
session.execute(insert);
			}

			long start = System.currentTimeMillis();
			Statement scan = new SimpleStatement(
	"SELECT * FROM users WHERE name='Anna0';");
			scan.enableTracing();
			ResultSet results = session.execute(scan);
			for (Row row : results) {
System.out.format("%d %s\n", row.getInt("id"),
		row.getString("name"));
			}
			long end = System.currentTimeMillis();
			System.out.println(" Time Taken " +  (end - start));
			ExecutionInfo executionInfo = results.getExecutionInfo();
			QueryTrace queryTrace = executionInfo.getQueryTrace();

			System.out.printf("%-38s | %-12s | %-10s | %-12s\n", "activity",
	"timestamp", "source", "source_elapsed");
			System.out
	.println("---+--++--");
			for (QueryTrace.Event event : queryTrace.getEvents()) {
System.out.printf("%38s | %12s | %10s | %12s\n",
		event.getDescription(),
		millis2Date(event.getTimestamp()), event.getSource(),
		event.getSourceElapsedMicros());
			}

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (session != null) {
session.close();
			}
			if (cluster != null) {
cluster.close();
			}
		}
	}

	private static Object millis2Date(long timestamp) {
		return format.format(timestamp);
	}
}
package com.datastax.spark.connector.demo;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.google.common.base.Objects;

import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.cassandra.CassandraSQLContext;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

/**
 * This Spark application demonstrates how to use Spark Cassandra Connector with
 * Java.
 * 
 * In order to run it, you will need to run Cassandra database, and create the
 * following keyspace, table and secondary index:
 * 
 * 
 * 
 * CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
 * 
 * CREATE TABLE test.people (
 *  id  INT,
 *  nameTEXT,
 *  birth_date  T

Re: KafkaUtils explicit acks

2014-12-11 Thread Tathagata Das
I am updating the docs right now. Here is a staged copy that you can
have sneak peek of. This will be part of the Spark 1.2.

http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

The updated fault-tolerance section tries to simplify the explanation
of when and what data can be lost, and how to prevent that using the
new experimental feature of write ahead logs.
Any feedback will be much appreciated.

TD

On Wed, Dec 10, 2014 at 2:42 AM,   wrote:
> [sorry for the botched half-message]
>
> Hi Mukesh,
>
> There’s been some great work on Spark Streaming reliability lately.
> https://www.youtube.com/watch?v=jcJq3ZalXD8
> Look at the links from:
> https://issues.apache.org/jira/browse/SPARK-3129
>
> I’m not aware of any doc yet (did I miss something ?) but you can look at
> the ReliableKafkaReceiver’s test suite:
>
> external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
>
> —
> FG
>
>
> On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha 
> wrote:
>>
>> Hello Guys,
>>
>> Any insights on this??
>> If I'm not clear enough my question is how can I use kafka consumer and
>> not loose any data in cases of failures with spark-streaming.
>>
>> On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha 
>> wrote:
>>>
>>> Hello Experts,
>>>
>>> I'm working on a spark app which reads data from kafka & persists it in
>>> hbase.
>>>
>>> Spark documentation states the below [1] that in case of worker failure
>>> we can loose some data. If not how can I make my kafka stream more reliable?
>>> I have seen there is a simple consumer [2] but I'm not sure if it has
>>> been used/tested extensively.
>>>
>>> I was wondering if there is a way to explicitly acknowledge the kafka
>>> offsets once they are replicated in memory of other worker nodes (if it's
>>> not already done) to tackle this issue.
>>>
>>> Any help is appreciated in advance.
>>>
>>>
>>> Using any input source that receives data through a network - For
>>> network-based data sources like Kafka and Flume, the received input data is
>>> replicated in memory between nodes of the cluster (default replication
>>> factor is 2). So if a worker node fails, then the system can recompute the
>>> lost from the the left over copy of the input data. However, if the worker
>>> node where a network receiver was running fails, then a tiny bit of data may
>>> be lost, that is, the data received by the system but not yet replicated to
>>> other node(s). The receiver will be started on a different node and it will
>>> continue to receive data.
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>> Txz,
>>>
>>> Mukesh Jha
>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> Mukesh Jha
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Akhil Das
Add these jars while creating the Context.

   val sc = new SparkContext(conf)


sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
*spark-streaming-kafka_2.10-1.1.0.jar*")
sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
*zkclient-0.3.jar*")

sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
*metrics-core-2.2.0.jar*")
sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
*kafka_2.10-0.8.0.jar*")

val ssc = new StreamingContext(sc, Seconds(10))


Thanks
Best Regards

On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

>  Hi,
>
> I'm trying to use spark-streaming with kafka but I get a strange error on
> class that are missing. I would like to ask if my way to build the fat jar
> is correct or no. My program is
>
> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
> kafkaGroupId, kafkaTopicsWithThreads)
> .map(_._2)
>
> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
> iter:Iterator[CellWithLAC] =>
>   println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
> })
>
> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
> plugin) is
>
> name := "spark_example"
>
> version := "0.0.1"
>
> scalaVersion := "2.10.4"
>
> scalacOptions ++= Seq("-deprecation","-feature")
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>   "joda-time" % "joda-time" % "2.6"
> )
>
> assemblyMergeStrategy in assembly := {
>   case p if p startsWith "com/esotericsoftware/minlog" =>
> MergeStrategy.first
>   case p if p startsWith "org/apache/commons/beanutils" =>
> MergeStrategy.first
>   case p if p startsWith "org/apache/" => MergeStrategy.last
>   case "plugin.properties" => MergeStrategy.discard
>   case p if p startsWith "META-INF" => MergeStrategy.discard
>   case x =>
> val oldStrategy = (assemblyMergeStrategy in assembly).value
> oldStrategy(x)
> }
>
> I create the jar with sbt assembly and the run with
> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
> test-consumer-group test1
>
> where master:7077 is the spark master, localhost:2181 is zookeeper,
> test-consumer-group is kafka groupid and test1 is the kafka topic. The
> program starts and keep running but I get an error and nothing is printed.
> In the log I found the following stack trace:
>
> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection from
> [10.0.3.1/10.0.3.1:54325]
> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection to
> [jpl-devvax/127.0.1.1:38767]
> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to [jpl-devvax/
> 127.0.1.1:38767], 1 messages pending
> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in memory on jpl-devvax:38767 (size: 842.0 B, free: 265.4 MB)
> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver for
> stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
> Source)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
> Source)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
> Source)
> at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
> Source)
> at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
> at
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
> Source)
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
> at
> org.apache.spark.SparkC

RE: "Session" for connections?

2014-12-11 Thread Ashic Mahtab
That makes sense. I'll try that.

Thanks :)

> From: tathagata.das1...@gmail.com
> Date: Thu, 11 Dec 2014 04:53:01 -0800
> Subject: Re: "Session" for connections?
> To: as...@live.com
> CC: user@spark.apache.org
> 
> You could create a lazily initialized singleton factory and connection
> pool. Whenever an executor starts running the firt task that needs to
> push out data, it will create the connection pool as a singleton. And
> subsequent tasks running on the executor is going to use the
> connection pool. You will also have to intelligently shutdown the
> connections because there is not a obvious way to shut them down. You
> could have a usage timeout - shutdown connection after not being used
> for 10 x batch interval.
> 
> TD
> 
> On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab  wrote:
> > Hi,
> > I was wondering if there's any way of having long running session type
> > behaviour in spark. For example, let's say we're using Spark Streaming to
> > listen to a stream of events. Upon receiving an event, we process it, and if
> > certain conditions are met, we wish to send a message to rabbitmq. Now,
> > rabbit clients have the concept of a connection factory, from which you
> > create a connection, from which you create a channel. You use the channel to
> > get a queue, and finally the queue is what you publish messages on.
> >
> > Currently, what I'm doing can be summarised as :
> >
> > dstream.foreachRDD(x => x.forEachPartition(y => {
> >val factory = ..
> >val connection = ...
> >val channel = ...
> >val queue = channel.declareQueue(...);
> >
> >y.foreach(z => Processor.Process(z, queue));
> >
> >cleanup the queue stuff.
> > }));
> >
> > I'm doing the same thing for using Cassandra, etc. Now in these cases, the
> > session initiation is expensive, so foing it per message is not a good idea.
> > However, I can't find a way to say "hey...do this per worker once and only
> > once".
> >
> > Is there a better pattern to do this?
> >
> > Regards,
> > Ashic.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: "Session" for connections?

2014-12-11 Thread Tathagata Das
You could create a lazily initialized singleton factory and connection
pool. Whenever an executor starts running the firt task that needs to
push out data, it will create the connection pool as a singleton. And
subsequent tasks running on the executor is going to use the
connection pool. You will also have to intelligently shutdown the
connections because there is not a obvious way to shut them down. You
could have a usage timeout - shutdown connection after not being used
for 10 x batch interval.

TD

On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab  wrote:
> Hi,
> I was wondering if there's any way of having long running session type
> behaviour in spark. For example, let's say we're using Spark Streaming to
> listen to a stream of events. Upon receiving an event, we process it, and if
> certain conditions are met, we wish to send a message to rabbitmq. Now,
> rabbit clients have the concept of a connection factory, from which you
> create a connection, from which you create a channel. You use the channel to
> get a queue, and finally the queue is what you publish messages on.
>
> Currently, what I'm doing can be summarised as :
>
> dstream.foreachRDD(x => x.forEachPartition(y => {
>val factory = ..
>val connection = ...
>val channel = ...
>val queue = channel.declareQueue(...);
>
>y.foreach(z => Processor.Process(z, queue));
>
>cleanup the queue stuff.
> }));
>
> I'm doing the same thing for using Cassandra, etc. Now in these cases, the
> session initiation is expensive, so foing it per message is not a good idea.
> However, I can't find a way to say "hey...do this per worker once and only
> once".
>
> Is there a better pattern to do this?
>
> Regards,
> Ashic.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark streaming: missing classes when kafka consumer classes

2014-12-11 Thread Mario Pastorelli

Hi,

I'm trying to use spark-streaming with kafka but I get a strange error 
on class that are missing. I would like to ask if my way to build the 
fat jar is correct or no. My program is


val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum, 
kafkaGroupId, kafkaTopicsWithThreads)

.map(_._2)

kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition { 
iter:Iterator[CellWithLAC] =>

  println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
})

I use sbt to manage my project and my build.sbt (with assembly 0.12.0 
plugin) is


name := "spark_example"

version := "0.0.1"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-deprecation","-feature")

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
  "joda-time" % "joda-time" % "2.6"
)

assemblyMergeStrategy in assembly := {
  case p if p startsWith "com/esotericsoftware/minlog" => 
MergeStrategy.first
  case p if p startsWith "org/apache/commons/beanutils" => 
MergeStrategy.first

  case p if p startsWith "org/apache/" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.discard
  case p if p startsWith "META-INF" => MergeStrategy.discard
  case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}

I create the jar with sbt assembly and the run with 
$SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main 
target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181 
test-consumer-group test1


where master:7077 is the spark master, localhost:2181 is zookeeper, 
test-consumer-group is kafka groupid and test1 is the kafka topic. The 
program starts and keep running but I get an error and nothing is 
printed. In the log I found the following stack trace:


14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection 
from [10.0.3.1/10.0.3.1:54325]
14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection 
to [jpl-devvax/127.0.1.1:38767]
14/12/11 13:02:08 INFO network.SendingConnection: Connected to 
[jpl-devvax/127.0.1.1:38767], 1 messages pending
14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added 
broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free: 
265.4 MB)
14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver 
for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver 
for stream 0: Error starting receiver 0 - 
java.lang.NoClassDefFoundError: 
kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown 
Source)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown 
Source)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown 
Source)
at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown 
Source)

at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown 
Source)
at 
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

I have searched inside the fat jar and I found that that class is not in 
it:


> jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep 
"kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"

>

The problem is the double dollar before anonfun: if y

Re: ERROR YarnClientClusterScheduler: Lost executor Akka client disassociated

2014-12-11 Thread Muhammad Ahsan
--
Code
--
scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.SchemaRDD

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val hiveContext: HiveContext = new HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@2de76244

scala> val numDays = 2
numDays: Int = 2

scala> case class Click(
/* about 20 fields of type STRING */
)
defined class Click

scala> val inputRDD = new Array[SchemaRDD](numDays)
inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null)

scala> for (i <- 1 to numDays) {
 | if (i < 10) {
 | inputRDD(i - 1) =
hiveContext.parquetFile("hdfs://" + i)
 | } else {
 | inputRDD(i - 1) =
hiveContext.parquetFile("hdfs://" + i)
 | }
 | 
 | }
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

scala> var unionRDD = inputRDD(1)
unionRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[1] at RDD at SchemaRDD.scala:104

scala> for (i <- 1 to inputRDD.length - 1) {
 | unionRDD = unionRDD.unionAll(inputRDD(i))
 | }

scala> val inputRDD = unionRDD
inputRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[2] at RDD at SchemaRDD.scala:104
scala> 

scala> inputRDD.registerTempTable("urlInfo")

scala> val clickstreamRDD = hiveContext.sql("select * from urlInfo "
+
 | "where guid regexp '^[0-9a-f-]{36}$' " +
 | "AND ((callerid  > 3 AND callerid <1) OR callerid >
10 " +
 | "OR (callerid=3 AND browsertype = 'IE')) " +
 | "AND countrycode regexp '^[A-Z]{2}$'")
clickstreamRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[3] at RDD at SchemaRDD.scala:104
scala> 

scala> clickstreamRDD.registerTempTable("clickstream")

scala> clickstreamRDD.cache()
res4: clickstreamRDD.type = 
SchemaRDD[3] at RDD at SchemaRDD.scala:104

scala> val guidClickRDD = clickstreamRDD.map(row =>
(row(7).asInstanceOf[String], {
 | val value = Click(row(0).asInstanceOf[String],
 | row(1).asInstanceOf[String],
row(2).asInstanceOf[String],
 | row(3).asInstanceOf[String],
row(4).asInstanceOf[String],
 | row(5).asInstanceOf[String],
row(6).asInstanceOf[String],
 | row(7).asInstanceOf[String],
row(8).asInstanceOf[String],
 | row(9).asInstanceOf[String],
row(10).asInstanceOf[String],
 | row(11).asInstanceOf[String],
row(12).asInstanceOf[String],
 | row(13).asInstanceOf[String],
row(14).asInstanceOf[String],
 | row(15).asInstanceOf[String],
row(16).asInstanceOf[String],
 | row(17).asInstanceOf[String],
row(18).asInstanceOf[String],
 | row(19).asInstanceOf[String])
 | value
 | }))
guidClickRDD: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[14] at
map at :25

scala> val blackList: RDD[(String, Click)] =
guidClickRDD.groupByKey().filter(row => row._2.size == 1).map(row =>
 | (row._1.asInstanceOf[String], Click("", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "")))
blackList: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[27] at map
at :27

scala> val guidClickFRDD = guidClickRDD.subtractByKey(blackList)
guidClickFRDD: org.apache.spark.rdd.RDD[(String, Click)] = SubtractedRDD[28]
at subtractByKey at :29

scala> guidClickFRDD.reduceByKey((x, y) => {
 | /* commutative and associative function */
 | Click("US", "US", "US", "US", "US", "US", "US", "US",
"US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US")
 | }).take(200).foreach(println)

--
EXPECTED OUTPUT
--

(Key_A,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_B,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_C,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_D,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-YarnClientClusterScheduler-Lost-executor-Akka-

"Session" for connections?

2014-12-11 Thread Ashic Mahtab
Hi,
I was wondering if there's any way of having long running session type 
behaviour in spark. For example, let's say we're using Spark Streaming to 
listen to a stream of events. Upon receiving an event, we process it, and if 
certain conditions are met, we wish to send a message to rabbitmq. Now, rabbit 
clients have the concept of a connection factory, from which you create a 
connection, from which you create a channel. You use the channel to get a 
queue, and finally the queue is what you publish messages on.

Currently, what I'm doing can be summarised as :

dstream.foreachRDD(x => x.forEachPartition(y => {
   val factory = ..
   val connection = ...
   val channel = ...
   val queue = channel.declareQueue(...);

   y.foreach(z => Processor.Process(z, queue));
   
   cleanup the queue stuff. 
}));

I'm doing the same thing for using Cassandra, etc. Now in these cases, the 
session initiation is expensive, so foing it per message is not a good idea. 
However, I can't find a way to say "hey...do this per worker once and only 
once".

Is there a better pattern to do this?

Regards,
Ashic.
  

Re: Standalone spark cluster. Can't submit job programmatically -> java.io.InvalidClassException

2014-12-11 Thread sivarani
No able to get it , how did you exactly fix it? i am using maven build

i downloaded spark1.1.1 and then packaged with mvn -Dhadoop.version=1.2.1
-DskipTests clean package but i keep getting invalid class exceptions



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-spark-cluster-Can-t-submit-job-programmatically-java-io-InvalidClassException-tp13456p20624.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark steaming : work with collect() but not without collect()

2014-12-11 Thread Tathagata Das
What does process do? Maybe when this process function is being run in
the Spark executor, it is causing the some static initialization,
which fails causing this exception. For Oracle documentation,
an ExceptionInInitializerError is thrown to indicate that an exception
occurred during evaluation of a static initializer or the initializer
for a static variable.

TD

On Thu, Dec 11, 2014 at 1:36 AM, Gerard Maas  wrote:
> Have you tried with  kafkaStream.foreachRDD(rdd => {rdd.foreach(...)} ?
> Would that make a difference?
>
>
> On Thu, Dec 11, 2014 at 10:24 AM, david  wrote:
>>
>> Hi,
>>
>>   We use the following Spark Streaming code to collect and process Kafka
>> event :
>>
>> kafkaStream.foreachRDD(rdd => {
>>   rdd.collect().foreach(event => {
>>   process(event._1, event._2)
>>   })
>> })
>>
>> This work fine.
>>
>> But without /collect()/ function, the following exception is raised for
>> call
>> to function process:
>> *Loss was due to java.lang.ExceptionInInitializerError*
>>
>>
>>   We attempt to rewrite like this but the same exception is raised :
>>
>>  kafkaStream.foreachRDD(rdd => {
>>   rdd.foreachPartition(iter =>
>> iter.foreach (event => {
>> process(event._1, event._2)
>>   })
>>   )
>> })
>>
>>
>> Does anybody can explain to us why and how to solve this issue ?
>>
>> Thank's
>>
>> Regards
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark-SQL JDBC driver

2014-12-11 Thread Anas Mosaad
Actually I came to a conclusion that RDDs has to be persisted in hive in
order to be able to access through thrift.
Hope I didn't end up with incorrect conclusion.
Please someone correct me if I am wrong.
On Dec 11, 2014 8:53 AM, "Judy Nash" 
wrote:

>  Looks like you are wondering why you cannot see the RDD table you have
> created via thrift?
>
>
>
> Based on my own experience with spark 1.1, RDD created directly via Spark
> SQL (i.e. Spark Shell or Spark-SQL.sh) is not visible on thrift, since
> thrift has its own session containing its own RDD.
>
> Spark SQL experts on the forum can confirm on this though.
>
>
>
> *From:* Cheng Lian [mailto:lian.cs@gmail.com]
> *Sent:* Tuesday, December 9, 2014 6:42 AM
> *To:* Anas Mosaad
> *Cc:* Judy Nash; user@spark.apache.org
> *Subject:* Re: Spark-SQL JDBC driver
>
>
>
> According to the stacktrace, you were still using SQLContext rather than
> HiveContext. To interact with Hive, HiveContext *must* be used.
>
> Please refer to this page
> http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
>
>  On 12/9/14 6:26 PM, Anas Mosaad wrote:
>
>  Back to the first question, this will mandate that hive is up and
> running?
>
>
>
> When I try it, I get the following exception. The documentation says that
> this method works only on SchemaRDD. I though that countries.saveAsTable
> did not work for that a reason so I created a tmp that contains the results
> from the registered temp table. Which I could validate that it's a
> SchemaRDD as shown below.
>
>
>
>
> * @Judy,* I do really appreciate your kind support and I want to
> understand and off course don't want to wast your time. If you can direct
> me the documentation describing this details, this will be great.
>
>
>
> scala> val tmp = sqlContext.sql("select * from countries")
>
> tmp: org.apache.spark.sql.SchemaRDD =
>
> SchemaRDD[12] at RDD at SchemaRDD.scala:108
>
> == Query Plan ==
>
> == Physical Plan ==
>
> PhysicalRDD
> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29],
> MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36
>
>
>
> scala> tmp.saveAsTable("Countries")
>
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
> plan found, tree:
>
> 'CreateTableAsSelect None, Countries, false, None
>
>  Project
> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29]
>
>   Subquery countries
>
>LogicalRDD
> [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29],
> MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36
>
>
>
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83)
>
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
>
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
>
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
>
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
>
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
>
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
>
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
>
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>
> at
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>
> at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
>
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
>
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
>
> at scala.collection.immutable.List.foreach(List.scala:318)
>
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
>
> at
> org.apache.spar

Re: Key not valid / already cancelled using Spark Streaming

2014-12-11 Thread Tathagata Das
Following Gerard's thoughts, here are possible things that could be happening.

1. Is there another process in the background that is deleting files
in the directory where you are trying to write? Seems like the
temporary file generated by one of the tasks is getting delete before
it is renamed to the final output file. I suggest trying to not write
to S3, rather just count and print (with rest of the computation
staying exactly same) and see if the error still occurs. That would
narrow down the culprit to what Gerard suggested.
2. Do you have speculative execution turned on? If so, could you turn
it off and try?

TD

On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas  wrote:
> If the timestamps in the logs are to be trusted It looks like your driver is
> dying with that java.io.FileNotFoundException: and therefore the workers
> loose their connection and close down.
>
> -kr, Gerard.
>
> On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das 
> wrote:
>>
>> Try to add the following to the sparkConf
>>
>>  .set("spark.core.connection.ack.wait.timeout","6000")
>>
>>   .set("spark.akka.frameSize","60")
>>
>> Used to face that issue with spark 1.1.0
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
>>  wrote:
>>>
>>> Dear Spark'ers,
>>>
>>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
>>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
>>> job does the following:
>>> - Consumes a specific Kafka topic
>>> - Writes its content to S3 or HDFS
>>>
>>> Records in Kafka are in the form:
>>> {"key": "someString"}
>>>
>>> This is important because I use the value of "key" to define the output
>>> file name in S3.
>>> Here are the Spark and Kafka parameters I'm using:
>>>
 val sparkConf = new SparkConf()
   .setAppName("MyDumperApp")
   .set("spark.task.maxFailures", "100")
   .set("spark.hadoop.validateOutputSpecs", "false")
   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
 val kafkaParams = Map(
   "zookeeper.connect" -> zkQuorum,
   "zookeeper.session.timeout.ms" -> "1",
   "rebalance.backoff.ms" -> "8000",
   "rebalance.max.retries" -> "10",
   "group.id" -> group,
   "auto.offset.reset" -> "largest"
 )
>>>
>>>
>>> My application is the following:
>>>
 KafkaUtils.createStream[String, String, StringDecoder,
 StringDecoder](ssc, kafkaParams, Map(topic -> 1),
 StorageLevel.MEMORY_AND_DISK_SER_2)
   .foreachRDD((rdd, time) =>
 rdd.map {
   case (_, line) =>
 val json = parse(line)
 val key = extract(json, "key").getOrElse("key_not_found")
 (key, dateFormatter.format(time.milliseconds)) -> line
 }
   .partitionBy(new HashPartitioner(10))
   .saveAsHadoopFile[KeyBasedOutput[(String,String),
 String]]("s3://BUCKET", classOf[BZip2Codec])
   )
>>>
>>>
>>> And the last piece:
>>>
 class KeyBasedOutput[T >: Null, V <: AnyRef] extends
 MultipleTextOutputFormat[T , V] {
   override protected def generateFileNameForKeyValue(key: T, value: V,
 leaf: String) = key match {
 case (myKey, batchId) =>
   "somedir" + "/" + myKey + "/" +
 "prefix-" + myKey + "_" + batchId + "_" + leaf
   }
   override protected def generateActualKey(key: T, value: V) = null
 }
>>>
>>>
>>> I use batch sizes of 5 minutes with checkpoints activated.
>>> The job fails nondeterministically (I think it never ran longer than ~5
>>> hours). I have no clue why, it simply fails.
>>> Please find below the exceptions thrown by my application.
>>>
>>> I really appreciate any kind of hint.
>>> Thank you very much in advance.
>>>
>>> Regards,
>>> -- Flávio
>>>
>>>  Executor 1
>>>
>>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>>> curMem=194463488,
>>>  maxMem=4445479895
>>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes in
>>> memor
>>> y (estimated size 96.4 KB, free 4.0 GB)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>>

Re: SchemaRDD partition on specific column values?

2014-12-11 Thread nitin
Can we take this as a performance improvement task in Spark-1.2.1? I can help
contribute for this.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can spark job have sideeffects (write files to FileSystem)

2014-12-11 Thread Paweł Szulc
Imagine simple Spark job, that will store each line of the RDD to a
separate file


val lines = sc.parallelize(1 to 100).map(n => s"this is line $n")
lines.foreach(line => writeToFile(line))

def writeToFile(line: String) = {
def filePath = "file://..."
val file = new File(new URI(path).getPath)
// using function simply closes the output stream
using(new FileOutputStream(file)) { output =>
  output.write(value)
}
}


Now, example above works 99,9% of a time. Files are generated for each
line, each file contains that particular line.

However, when dealing with large number of data, we encounter situations
where some of the files are empty! Files are generated, but there is no
content inside of them (0 bytes).

Now the question is: can Spark job have side effects. Is it even legal to
write such code?
If no, then what other choice do we have when we want to save data from our
RDD?
If yes, then do you guys see what could be the reason of this job acting in
this strange manner 0.1% of the time?


disclaimer: we are fully aware of .saveAsTextFile method in the API,
however the example above is a simplification of our code - normally we
produce PDF files.


Best regards,
Paweł Szulc


spark logging issue

2014-12-11 Thread Sourav Chandra
Hi,

I am using spark 1.1.0 and setting below properties while creating spark
context.


*spark.executor.logs.rolling.maxRetainedFiles = 10*

*spark.executor.logs.rolling.size.maxBytes = 104857600*
*spark.executor.logs.rolling.strategy = size*

Even though I am setting to rollover after 100 MB, the log files (i.e.
stderr) file is not rolled over.

What am I missing here?

-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Error on JavaSparkContext.stop()

2014-12-11 Thread Taeyun Kim
Hi,

 

When my spark program calls JavaSparkContext.stop(), the following errors
occur.

   

   14/12/11 16:24:19 INFO Main: sc.stop {

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster02,38918) not found

   14/12/11 16:24:20 ERROR SendingConnection: Exception while
reading SendingConnection to ConnectionManagerId(cluster04,59659)

   java.nio.channels.ClosedChannelException

 at
sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)

 at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)

 at
org.apache.spark.network.SendingConnection.read(Connection.scala:390)

 at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.sca
la:205)

 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
15)

 at java.lang.Thread.run(Thread.java:745)

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster03,59821) not found

   14/12/11 16:24:20 ERROR ConnectionManager: Corresponding
SendingConnection to ConnectionManagerId(cluster02,38918) not found

   14/12/11 16:24:20 WARN ConnectionManager: All connections not
cleaned up

   14/12/11 16:24:20 INFO Main: sc.stop }

 

How can I fix this?

 

The configuration is as follows:

- Spark version is 1.1.1

- Client runs on Windows 7

- The cluster is Linux(CentOS 6.5).

- spark.master=yarn-client

- Since Spark has a problem submitting job from Windows to Linux, I applied
my patch to the Spark source code. (Please see
https://github.com/apache/spark/pull/899 )

 

Spark 1.0.0 did not have this problem.

 

Thanks.

 



Re: Key not valid / already cancelled using Spark Streaming

2014-12-11 Thread Gerard Maas
If the timestamps in the logs are to be trusted It looks like your driver
is dying with that *java.io.FileNotFoundException*: and therefore the
workers loose their connection and close down.

-kr, Gerard.

On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das 
wrote:

> Try to add the following to the sparkConf
>
>  .set("spark.core.connection.ack.wait.timeout","6000")
>
>   .set("spark.akka.frameSize","60")
>
> Used to face that issue with spark 1.1.0
>
> Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos  > wrote:
>
>> Dear Spark'ers,
>>
>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
>> job does the following:
>> - Consumes a specific Kafka topic
>> - Writes its content to S3 or HDFS
>>
>> Records in Kafka are in the form:
>> {"key": "someString"}
>>
>> This is important because I use the value of "key" to define the output
>> file name in S3.
>> Here are the Spark and Kafka parameters I'm using:
>>
>> val sparkConf = new SparkConf()
>>>   .setAppName("MyDumperApp")
>>>   .set("spark.task.maxFailures", "100")
>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>>> val kafkaParams = Map(
>>>   "zookeeper.connect" -> zkQuorum,
>>>   "zookeeper.session.timeout.ms" -> "1",
>>>   "rebalance.backoff.ms" -> "8000",
>>>   "rebalance.max.retries" -> "10",
>>>   "group.id" -> group,
>>>   "auto.offset.reset" -> "largest"
>>> )
>>
>>
>> My application is the following:
>>
>> KafkaUtils.createStream[String, String, StringDecoder,
>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>>>   .foreachRDD((rdd, time) =>
>>> rdd.map {
>>>   case (_, line) =>
>>> val json = parse(line)
>>> val key = extract(json, "key").getOrElse("key_not_found")
>>> (key, dateFormatter.format(time.milliseconds)) -> line
>>> }
>>>   .partitionBy(new HashPartitioner(10))
>>>   .saveAsHadoopFile[KeyBasedOutput[(String,String),
>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>>   )
>>
>>
>> And the last piece:
>>
>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>>> MultipleTextOutputFormat[T , V] {
>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>>> leaf: String) = key match {
>>> case (myKey, batchId) =>
>>>   "somedir" + "/" + myKey + "/" +
>>> "prefix-" + myKey + "_" + batchId + "_" + leaf
>>>   }
>>>   override protected def generateActualKey(key: T, value: V) = null
>>> }
>>
>>
>> I use batch sizes of 5 minutes with checkpoints activated.
>> The job fails nondeterministically (I think it never ran longer than ~5
>> hours). I have no clue why, it simply fails.
>> Please find below the exceptions thrown by my application.
>>
>> I really appreciate any kind of hint.
>> Thank you very much in advance.
>>
>> Regards,
>> -- Flávio
>>
>>  Executor 1
>>
>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>> curMem=194463488,
>>  maxMem=4445479895
>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
>> in memor
>> y (estimated size 96.4 KB, free 4.0 GB)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute

Re: Spark steaming : work with collect() but not without collect()

2014-12-11 Thread Gerard Maas
Have you tried with  kafkaStream.foreachRDD(rdd => {rdd.foreach(...)} ?
Would that make a difference?


On Thu, Dec 11, 2014 at 10:24 AM, david  wrote:

> Hi,
>
>   We use the following Spark Streaming code to collect and process Kafka
> event :
>
> kafkaStream.foreachRDD(rdd => {
>   rdd.collect().foreach(event => {
>   process(event._1, event._2)
>   })
> })
>
> This work fine.
>
> But without /collect()/ function, the following exception is raised for
> call
> to function process:
> *Loss was due to java.lang.ExceptionInInitializerError*
>
>
>   We attempt to rewrite like this but the same exception is raised :
>
>  kafkaStream.foreachRDD(rdd => {
>   rdd.foreachPartition(iter =>
> iter.foreach (event => {
> process(event._1, event._2)
>   })
>   )
> })
>
>
> Does anybody can explain to us why and how to solve this issue ?
>
> Thank's
>
> Regards
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark steaming : work with collect() but not without collect()

2014-12-11 Thread david
Hi,

  We use the following Spark Streaming code to collect and process Kafka
event :

kafkaStream.foreachRDD(rdd => {
  rdd.collect().foreach(event => {
  process(event._1, event._2)
  })
})

This work fine.

But without /collect()/ function, the following exception is raised for call
to function process:
*Loss was due to java.lang.ExceptionInInitializerError*


  We attempt to rewrite like this but the same exception is raised :

 kafkaStream.foreachRDD(rdd => {
  rdd.foreachPartition(iter =>
iter.foreach (event => {
process(event._1, event._2)
  })
  )
})


Does anybody can explain to us why and how to solve this issue ?

Thank's

Regards






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-steaming-work-with-collect-but-not-without-collect-tp20622.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Decision Tree with libsvmtools datasets

2014-12-11 Thread Sean Owen
The implementation assumes classes are 0-indexed, not 1-indexed. You
should set numClasses = 3 and change your labels to 0, 1, 2.

On Thu, Dec 11, 2014 at 3:40 AM, Ge, Yao (Y.)  wrote:
> I am testing decision tree using iris.scale data set
> (http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass.html#iris)
>
> In the data set there are three class labels 1, 2, and 3. However in the
> following code, I have to make numClasses = 4. I will get an
> ArrayIndexOutOfBound Exception if I make the numClasses = 3. Why?
>
>
>
> var conf = new SparkConf().setAppName("DecisionTree")
>
> var sc = new SparkContext(conf)
>
>
>
> val data = MLUtils.loadLibSVMFile(sc,"data/iris.scale.txt");
>
> val numClasses = 4;
>
> val categoricalFeaturesInfo = Map[Int,Int]();
>
> val impurity = "gini";
>
> val maxDepth = 5;
>
> val maxBins = 100;
>
>
>
> val model = DecisionTree.trainClassifier(data, numClasses,
> categoricalFeaturesInfo, impurity, maxDepth, maxBins);
>
>
>
> val labelAndPreds = data.map{ point =>
>
>   val prediction = model.predict(point.features);
>
>   (point.label, prediction)
>
> }
>
>
>
> val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> data.count;
>
> println("Training Error = " + trainErr);
>
> println("Learned classification tree model:\n" + model);
>
>
>
> -Yao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Compare performance of sqlContext.jsonFile and sqlContext.jsonRDD

2014-12-11 Thread Cheng Lian
There are several overloaded versions of both |jsonFile| and |jsonRDD|. 
Schema inferring is kinda expensive since it requires an extra Spark 
job. You can avoid schema inferring by storing the inferred schema and 
then use it together with the following two methods:


 * |def jsonFile(path: String, schema: StructType): SchemaRDD|
 * |def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD|

You can use |StructType.json|/|StructType.prettyJson| and 
|DataType.fromJson| to store and load the schema.


Cheng

On 12/11/14 12:50 PM, Rakesh Nair wrote:


Couple of questions :
1. "sqlContext.jsonFile" reads a json file, infers the schema for the 
data stored, and then returns a SchemaRDD. Now, i could also create a 
SchemaRDD by reading a file as text(which returns RDD[String]) and 
then use the "jsonRDD" method. My question, is the "jsonFile" way of 
creating SchemaRDD slower than the second method i mentioned (maybe 
because jsonFile needs to infer the schema and jsonRDD just applies 
the schema to a dataset???)


 The workflow i am thinking of is: 1. For the first data set use 
"jsonFile" and infer the schema. 2. Save the schema somewhere. 3. For 
later data sets, create RDD[String] and then use "jsonRDD" method to 
convert the RDD[String] to SchemaRDD.



2. What is the best way to store a schema or rather how can i 
serialize StructType and store it in hdfs, so that i can load it later.


--
Regards
Rakesh Nair


​