Re: local class incompatible: stream classdesc serialVersionUID

2016-01-29 Thread Jason Plurad
I agree with you, Ted, if RDD had a serial version UID this might not be an
issue. So that could be a JIRA to submit to help avoid version mismatches
in future Spark versions, but that doesn't help my current situation
between 1.5.1 and 1.5.2.

Any other ideas? Thanks.
On Thu, Jan 28, 2016 at 5:06 PM Ted Yu  wrote:

> I am not Scala expert.
>
> RDD extends Serializable but doesn't have @SerialVersionUID() annotation.
> This may explain what you described.
>
> One approach is to add @SerialVersionUID so that RDD's have stable serial
> version UID.
>
> Cheers
>
> On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad  wrote:
>
>> I've searched through the mailing list archive. It seems that if you try
>> to run, for example, a Spark 1.5.2 program against a Spark 1.5.1 standalone
>> server, you will run into an exception like this:
>>
>> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage
>> 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
>> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
>> serialVersionUID = -3343649307726848892, local class serialVersionUID =
>> -3996494161745401652
>>
>> If my application is using a library that builds against Spark 1.5.2,
>> does that mean that my application is now tied to that same Spark
>> standalone server version?
>>
>> Is there a recommended way for that library to have a Spark dependency
>> but keep it compatible against a wider set of versions, i.e. any version
>> 1.5.x?
>>
>> Thanks!
>>
>
>


Re: ZlibFactor warning

2016-01-29 Thread Ted Yu
Did the stack trace look like the one from:
https://issues.apache.org/jira/browse/HADOOP-12638

Cheers

> On Jan 27, 2016, at 1:29 AM, Eli Super  wrote:
> 
> 
> Hi 
> 
> I'm running spark locally on win 2012 R2 server
> 
> No hadoop installed
> 
> I'm getting following error :
> 
> WARN ZlibFactory: Failed to load/initialize native-zlib library
> 
> Is it something to wary about ?
> 
> Thanks !


Re: building spark 1.6.0 fails

2016-01-29 Thread Sean Owen
You're somehow building with Java 6. At least this is what the error means.

On Fri, Jan 29, 2016, 05:25 Carlile, Ken  wrote:

> I am attempting to build Spark 1.6.0 from source on EL 6.3, using Oracle
> jdk 1.8.0.45, Python 2.7.6, and Scala 2.10.3. When I try to issue
> build/mvn/ -DskipTests clean package, I get the following:
>
> [INFO] Using zinc server for incremental compilation
> [info] Compiling 3 Java sources to
> /misc/local/spark-versions/spark-1.6.0-patched/spark-1.6.0/tags/target/scala-2.10/classes...
> [error] javac: invalid source release: 1.7
> [error] Usage: javac  
> [error] use -help for a list of possible options
> [error] Compile failed at Jan 28, 2016 8:56:36 AM [0.113s]
>
> I tried changing the pom.xml to have java version as 1.8, but I just got
> the same error with invalid source release: 1.8 instead of 1.7.
>
> My java -version and javac -version are reporting as 1.8.0.45, and I have
> the JAVA_HOME env set. Anyone have any ideas?
>
> Incidentally, building 2.0.0 from source worked fine…
>
> Thanks,
> Ken


Re: Spark Algorithms as WEB Application

2016-01-29 Thread Ted Yu
Have you looked at:
http://wiki.apache.org/tomcat/OutOfMemory

Cheers

> On Jan 29, 2016, at 2:44 AM, rahulganesh  wrote:
> 
> Hi,
> I am currently working on a web application which will call the spark mllib
> algorithms using JERSEY ( REST API ). The problem that i am facing is that i
> am frequently getting permgen space Java out of memory exception and also i
> am not able to save decision tree models using model.save(sc,path).
> 
> I am currently using java for creating spark algorithms and tomcat for
> running my web application. 
> 
> 
> ensureFreeSpace(230688) called with curMem=0, maxMem=2061647216
> Block broadcast_96 stored as values in memory (estimated size 225.3 KB, free
> 1965.9 MB)
> Servlet.service() for servlet [abc] in context with path [/abc] threw
> exception [org.glassfish.jersey.server.ContainerException:
> java.lang.OutOfMemoryError: PermGen space] with root cause
> java.lang.OutOfMemoryError: PermGen space
> 
> java.lang.OutOfMemoryError: PermGen space
> 
> 
> Thank you !
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Algorithms-as-WEB-Application-tp26099.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Number of batches in the Streaming Statics visualization screen

2016-01-29 Thread Terry Hoo
Yes, the data is stored in driver memory.

Mehdi Ben Haj Abbes 于2016年1月29日星期五 18:13写道:

> Thanks Terry for the quick answer.
>
> I did not tried it.  Lets say I will increase the value to 2, what
> side effect should I expect. In fact in the explanation of the property "How
> many finished batches the Spark UI and status APIs remember before garbage
> collecting." So the data is stored in memory, but the the memory of which
> component ... I imagine the driver ?
>
> regards,
>
> On Fri, Jan 29, 2016 at 10:52 AM, Terry Hoo  wrote:
>
>> Hi Mehdi,
>>
>> Do you try a larger value of "spark.streaming.ui.retainedBatches"(default
>> is 1000)?
>>
>> Regards,
>> - Terry
>>
>> On Fri, Jan 29, 2016 at 5:45 PM, Mehdi Ben Haj Abbes <
>> mehdi.ab...@gmail.com> wrote:
>>
>>> Hi folks,
>>>
>>> I have a streaming job running for more than 24 hours. It seems that
>>> there is a limit on the number of the batches displayed in the Streaming
>>> Statics visualization screen. For example if I would launch a job Friday I
>>> will not be able to have the statistics for what happened during Saturday.
>>> I will have the batches that have run the previous 24 hours and today it
>>> was like only the previous 3 hours.
>>>
>>> Any help will be very appreciated.
>>> --
>>> Mehdi BEN HAJ ABBES
>>>
>>>
>>
>
>
> --
> Mehdi BEN HAJ ABBES
>
>


Spark Algorithms as WEB Application

2016-01-29 Thread rahulganesh
Hi,
I am currently working on a web application which will call the spark mllib
algorithms using JERSEY ( REST API ). The problem that i am facing is that i
am frequently getting permgen space Java out of memory exception and also i
am not able to save decision tree models using model.save(sc,path).

I am currently using java for creating spark algorithms and tomcat for
running my web application. 


ensureFreeSpace(230688) called with curMem=0, maxMem=2061647216
Block broadcast_96 stored as values in memory (estimated size 225.3 KB, free
1965.9 MB)
Servlet.service() for servlet [abc] in context with path [/abc] threw
exception [org.glassfish.jersey.server.ContainerException:
java.lang.OutOfMemoryError: PermGen space] with root cause
java.lang.OutOfMemoryError: PermGen space

java.lang.OutOfMemoryError: PermGen space


Thank you !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Algorithms-as-WEB-Application-tp26099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



mapWithState / stateSnapshots() yielding empty rdds?

2016-01-29 Thread Sebastian Piu
Hi All,

I'm playing with the new mapWithState functionality but I can't get it
quite to work yet.

I'm doing two print() calls on the stream:
1. after mapWithState() call, first batch shows results - next batches
yield empty
2. after stateSnapshots(), always yields an empty RDD

Any pointers on what might be wrong?

This is the code I'm running:

final StateSpec state = StateSpec.function(UseCase::trackState);
JavaPairDStream pairs = messages.mapToPair(UseCase::mapToPair);JavaMapWithStateDStream stateMap = pairs.mapWithState(state);

stateMap.print(5);
stateMap.stateSnapshots()
.print(5);

stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint");

private static Optional> trackState(Time
batchTime, GroupingKey key, Optional value, State
state) {
Double current = state.exists() ? state.get() : 0.0;
Double sum = current + value.or(0.0);
return Optional.of(new Tuple2<>(key, sum));
}


Cheers,

Seb


Re: streaming textFileStream problem - got only ONE line

2016-01-29 Thread patcharee

I moved them every interval to the monitored directory.

Patcharee

On 25. jan. 2016 22:30, Shixiong(Ryan) Zhu wrote:
Did you move the file into "hdfs://helmhdfs/user/patcharee/cerdata/", 
or write into it directly? `textFileStream` requires that files must 
be written to the monitored directory by "moving" them from another 
location within the same file system.


On Mon, Jan 25, 2016 at 6:30 AM, patcharee > wrote:


Hi,

My streaming application is receiving data from file system and
just prints the input count every 1 sec interval, as the code below:

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms))
val lines = ssc.textFileStream(args(0))
lines.count().print()

The problem is sometimes the data received from scc.textFileStream
is ONLY ONE line. But in fact there are multiple lines in the new
file found in that interval. See log below which shows three
intervals. In the 2nd interval, the new file is:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This
file contains 6288 lines. The ssc.textFileStream returns ONLY ONE
line (the header).

Any ideas/suggestions what the problem is?


-
SPARK LOG

-

16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that
were older than 1453731011000 ms: 145373101 ms
16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that
were older than 1453731011000 ms:
16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms
16/01/25 15:11:12 INFO FileInputDStream: New files at time
1453731072000 ms:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt
---
Time: 1453731072000 ms
---
6288

16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that
were older than 1453731012000 ms: 1453731011000 ms
16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that
were older than 1453731012000 ms:
16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms
16/01/25 15:11:13 INFO FileInputDStream: New files at time
1453731073000 ms:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt
---
Time: 1453731073000 ms
---
1

16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that
were older than 1453731013000 ms: 1453731012000 ms
16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that
were older than 1453731013000 ms:
16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms
16/01/25 15:11:14 INFO FileInputDStream: New files at time
1453731074000 ms:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt
---
Time: 1453731074000 ms
---
6288


Thanks,
Patcharee






Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2

2016-01-29 Thread cc
Hey, Jia Zou

I'm curious about this exception, the error log you showed that the 
exception is related to unlockBlock, could you upload your full master.log 
and worker.log under tachyon/logs directory? 

Best,
Cheng

在 2016年1月29日星期五 UTC+8上午11:11:19,Calvin Jia写道:
>
> Hi,
>
> Thanks for the detailed information. How large is the dataset you are 
> running against? Also did you change any Tachyon configurations?
>
> Thanks,
> Calvin
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Number of batches in the Streaming Statics visualization screen

2016-01-29 Thread Mehdi Ben Haj Abbes
Thanks Terry for the quick answer.

I did not tried it.  Lets say I will increase the value to 2, what side
effect should I expect. In fact in the explanation of the property "How
many finished batches the Spark UI and status APIs remember before garbage
collecting." So the data is stored in memory, but the the memory of which
component ... I imagine the driver ?

regards,

On Fri, Jan 29, 2016 at 10:52 AM, Terry Hoo  wrote:

> Hi Mehdi,
>
> Do you try a larger value of "spark.streaming.ui.retainedBatches"(default
> is 1000)?
>
> Regards,
> - Terry
>
> On Fri, Jan 29, 2016 at 5:45 PM, Mehdi Ben Haj Abbes <
> mehdi.ab...@gmail.com> wrote:
>
>> Hi folks,
>>
>> I have a streaming job running for more than 24 hours. It seems that
>> there is a limit on the number of the batches displayed in the Streaming
>> Statics visualization screen. For example if I would launch a job Friday I
>> will not be able to have the statistics for what happened during Saturday.
>> I will have the batches that have run the previous 24 hours and today it
>> was like only the previous 3 hours.
>>
>> Any help will be very appreciated.
>> --
>> Mehdi BEN HAJ ABBES
>>
>>
>


-- 
Mehdi BEN HAJ ABBES


Re: GraphX can show graph?

2016-01-29 Thread Balachandar R.A.
Thanks... Will look into that

- Bala

On 28 January 2016 at 15:36, Sahil Sareen  wrote:

> Try Neo4j for visualization, GraphX does a pretty god job at distributed
> graph processing.
>
> On Thu, Jan 28, 2016 at 12:42 PM, Balachandar R.A. <
> balachandar...@gmail.com> wrote:
>
>> Hi
>>
>> I am new to GraphX. I have a simple csv file which I could load and
>> compute few graph statistics. However, I am not sure whether it is possible
>> to create ad show graph (for visualization purpose) using GraphX. Any
>> pointer to tutorial or information connected to this will be really helpful
>>
>> Thanks and regards
>> Bala
>>
>
>


Re: Spark GraphX + TitanDB + Cassandra?

2016-01-29 Thread Nick Pentreath
Hi Joe

A while ago I was running a Titan + HBase datastore to store graph data. I
then used Spark (via TitanHBaseInputFormat, you could use the Cassandra
version) to access a RDD[Vertex] that I performed analytics and machine
learning on. That could form the basis of putting the data into a form
usable in GraphX.

The talk here gives a bit of info on this including a little code snippet:
https://spark-summit.org/2014/using-spark-and-shark-to-power-a-real-time-recommendation-and-customer-intelligence-platform

Titan also provides Faunus (or I think it is now Gremlin-Hadoop), though
that is Hadoop-only at the moment.

On Tue, Jan 26, 2016 at 10:19 PM, Joe Bako  wrote:

> I’ve found some references online to various implementations (such as
> Dendrite) leveraging HDFS via TitanDB + HBase for graph processing.
> GraphLab also uses HDFS/Hadoop.  I am wondering if (and how) one might use
> TitanDB + Cassandra as the data source for Spark GraphX?  The Gremlin
> language seems more targeted towards basic traversals rather than
> analytics, and I’m unsure the performance of attempting to use Gremlin to
> load sub-graphs up into GraphX for analysis.  For example, if I have a
> large property graph and wish to run algorithms to find similar sub-graphs
> within, would TitanDB/Gremlin even be a consideration?  The underlying data
> model that Titan uses in Cassandra does not seem accessible for direct
> querying via CQL/Thrift.
>
> Any guidance around this nebulous subject is much appreciated!
>
> Joe Bako
> Software Architect
> Gracenote, Inc.
> Mobile: 925.818.2230
> http://www.gracenote.com/
>
> [cid:24DDC72C-B607-4624-9CB7-8DB5E866F2BF]
>
>


Re: Number of batches in the Streaming Statics visualization screen

2016-01-29 Thread Terry Hoo
Hi Mehdi,

Do you try a larger value of "spark.streaming.ui.retainedBatches"(default
is 1000)?

Regards,
- Terry

On Fri, Jan 29, 2016 at 5:45 PM, Mehdi Ben Haj Abbes 
wrote:

> Hi folks,
>
> I have a streaming job running for more than 24 hours. It seems that there
> is a limit on the number of the batches displayed in the Streaming Statics
> visualization screen. For example if I would launch a job Friday I will not
> be able to have the statistics for what happened during Saturday. I will
> have the batches that have run the previous 24 hours and today it was like
> only the previous 3 hours.
>
> Any help will be very appreciated.
> --
> Mehdi BEN HAJ ABBES
>
>


Re: mapWithState / stateSnapshots() yielding empty rdds?

2016-01-29 Thread Sebastian Piu
Just saw I'm not calling state.update() in my trackState function. I
guess that is the issue!

On Fri, Jan 29, 2016 at 9:36 AM, Sebastian Piu 
wrote:

> Hi All,
>
> I'm playing with the new mapWithState functionality but I can't get it
> quite to work yet.
>
> I'm doing two print() calls on the stream:
> 1. after mapWithState() call, first batch shows results - next batches
> yield empty
> 2. after stateSnapshots(), always yields an empty RDD
>
> Any pointers on what might be wrong?
>
> This is the code I'm running:
>
> final StateSpec state = StateSpec.function(UseCase::trackState);
> JavaPairDStream pairs = messages. Double>mapToPair(UseCase::mapToPair);JavaMapWithStateDStream Double, Double, Double> stateMap = pairs.mapWithState(state);
>
> stateMap.print(5);
> stateMap.stateSnapshots()
> .print(5);
>
> stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint");
>
> private static Optional> trackState(Time 
> batchTime, GroupingKey key, Optional value, State state) {
> Double current = state.exists() ? state.get() : 0.0;
> Double sum = current + value.or(0.0);
> return Optional.of(new Tuple2<>(key, sum));
> }
>
>
> Cheers,
>
> Seb
>
>


Number of batches in the Streaming Statics visualization screen

2016-01-29 Thread Mehdi Ben Haj Abbes
Hi folks,

I have a streaming job running for more than 24 hours. It seems that there
is a limit on the number of the batches displayed in the Streaming Statics
visualization screen. For example if I would launch a job Friday I will not
be able to have the statistics for what happened during Saturday. I will
have the batches that have run the previous 24 hours and today it was like
only the previous 3 hours.

Any help will be very appreciated.
-- 
Mehdi BEN HAJ ABBES


Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-29 Thread Daniel Darabos
Hi Andrew,

If you still see this with Spark 1.6.0, it would be very helpful if you
could file a bug about it at https://issues.apache.org/jira/browse/SPARK with
as much detail as you can. This issue could be a nasty source of silent
data corruption in a case where some intermediate data loses 8 characters
but it is not obvious in the final output. Thanks!

On Fri, Jan 29, 2016 at 7:53 AM, Jonathan Kelly 
wrote:

> Just FYI, Spark 1.6 was released on emr-4.3.0 a couple days ago:
> https://aws.amazon.com/blogs/aws/emr-4-3-0-new-updated-applications-command-line-export/
>
> On Thu, Jan 28, 2016 at 7:30 PM Andrew Zurn  wrote:
>
>> Hey Daniel,
>>
>> Thanks for the response.
>>
>> After playing around for a bit, it looks like it's probably the something
>> similar to the first situation you mentioned, with the Parquet format
>> causing issues. Both programmatically created dataset and a dataset pulled
>> off the internet (rather than out of S3 and put into HDFS/Hive) acted with
>> DataFrames as one would expect (printed out everything, grouped properly,
>> etc.)
>>
>> It looks like there is more than likely an outstanding bug that causes
>> issues with data coming from S3 and is converted in the parquet format
>> (found an article here highlighting it was around in 1.4, and I guess it
>> wouldn't be out of the realm of things for it still to exist. Link to
>> article:
>> https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/
>>
>> Hopefully a little more stability will come out with the upcoming Spark
>> 1.6 release on EMR (I think that is happening sometime soon).
>>
>> Thanks again for the advice on where to dig further into. Much
>> appreciated.
>>
>> Andrew
>>
>> On Tue, Jan 26, 2016 at 9:18 AM, Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> Have you tried setting spark.emr.dropCharacters to a lower value? (It
>>> defaults to 8.)
>>>
>>> :) Just joking, sorry! Fantastic bug.
>>>
>>> What data source do you have for this DataFrame? I could imagine for
>>> example that it's a Parquet file and on EMR you are running with two wrong
>>> version of the Parquet library and it messes up strings. It should be easy
>>> enough to try a different data format. You could also try what happens if
>>> you just create the DataFrame programmatically, e.g.
>>> sc.parallelize(Seq("asdfasdfasdf")).toDF.
>>>
>>> To understand better at which point the characters are lost you could
>>> try grouping by a string attribute. I see "education" ends up either as ""
>>> (empty string) or "y" in the printed output. But are the characters already
>>> lost when you try grouping by the attribute? Will there be a single ""
>>> category, or will you have separate categories for "primary" and "tertiary"?
>>>
>>> I think the correct output through the RDD suggests that the issue
>>> happens at the very end. So it will probably happen also with different
>>> data sources, and grouping will create separate groups for "primary" and
>>> "tertiary" even though they are printed as the same string at the end. You
>>> should also check the data from "take(10)" to rule out any issues with
>>> printing. You could try the same "groupBy" trick after "take(10)". Or you
>>> could print the lengths of the strings.
>>>
>>> Good luck!
>>>
>>> On Tue, Jan 26, 2016 at 3:53 AM, awzurn  wrote:
>>>
 Sorry for the bump, but wondering if anyone else has seen this before.
 We're
 hoping to either resolve this soon, or move on with further steps to
 move
 this into an issue.

 Thanks in advance,

 Andrew Zurn



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>


Re: local class incompatible: stream classdesc serialVersionUID

2016-01-29 Thread Ted Yu
I logged SPARK-13084

For the moment, please consider running with 1.5.2 on all the nodes.

On Fri, Jan 29, 2016 at 5:29 AM, Jason Plurad  wrote:

> I agree with you, Ted, if RDD had a serial version UID this might not be
> an issue. So that could be a JIRA to submit to help avoid version
> mismatches in future Spark versions, but that doesn't help my current
> situation between 1.5.1 and 1.5.2.
>
> Any other ideas? Thanks.
> On Thu, Jan 28, 2016 at 5:06 PM Ted Yu  wrote:
>
>> I am not Scala expert.
>>
>> RDD extends Serializable but doesn't have @SerialVersionUID() annotation.
>> This may explain what you described.
>>
>> One approach is to add @SerialVersionUID so that RDD's have stable
>> serial version UID.
>>
>> Cheers
>>
>> On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad  wrote:
>>
>>> I've searched through the mailing list archive. It seems that if you try
>>> to run, for example, a Spark 1.5.2 program against a Spark 1.5.1 standalone
>>> server, you will run into an exception like this:
>>>
>>> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in
>>> stage 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
>>> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
>>> serialVersionUID = -3343649307726848892, local class serialVersionUID =
>>> -3996494161745401652
>>>
>>> If my application is using a library that builds against Spark 1.5.2,
>>> does that mean that my application is now tied to that same Spark
>>> standalone server version?
>>>
>>> Is there a recommended way for that library to have a Spark dependency
>>> but keep it compatible against a wider set of versions, i.e. any version
>>> 1.5.x?
>>>
>>> Thanks!
>>>
>>
>>


Spark Streaming from existing RDD

2016-01-29 Thread Sateesh Karuturi
Anyone please  help me out how to create a DStream from existing RDD. My
code is:

JavaSparkContext ctx = new JavaSparkContext(conf);JavaRDD rddd
= ctx.parallelize(arraylist);

Now i need to use these *rddd* as input to *JavaStreamingContext*.


Pyspark filter not empty

2016-01-29 Thread patcharee

Hi,

In pyspark how to filter if a column of dataframe is not empty?

I tried:

dfNotEmpty = df.filter(df['msg']!='')

It did not work.

Thanks,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to correctly run scala script using spark-shell through stdin (spark v1.0.0)

2016-01-29 Thread Iulian Dragoș
On Fri, Jan 29, 2016 at 5:22 PM, Iulian Dragoș 
wrote:

> I found the issue in the 2.11 version of the REPL, PR will follow shortly.
>


https://github.com/apache/spark/pull/10984



>
> The 2.10 version of Spark doesn't have this issue, so you could use that
> in the mean time.
>
> iulian
>
> On Wed, Jan 27, 2016 at 3:17 PM,  wrote:
>
>> So far, still cannot find a way of running a small Scala script right
>> after executing the shell, and get the shell to remain open. Is there a way
>> of doing this?
>>
>> Feels like a simple/naive question but really couldn’t find an answer.
>>
>>
>>
>> *From:* Fernandez, Andres
>> *Sent:* Tuesday, January 26, 2016 2:53 PM
>> *To:* 'Ewan Leith'; Iulian Dragoș
>> *Cc:* user
>> *Subject:* RE: how to correctly run scala script using spark-shell
>> through stdin (spark v1.0.0)
>>
>>
>>
>> True thank you. Is there a way of having the shell not closed (how to
>> avoid the :quit statement). Thank you both.
>>
>>
>>
>> Andres
>>
>>
>>
>> *From:* Ewan Leith [mailto:ewan.le...@realitymine.com
>> ]
>> *Sent:* Tuesday, January 26, 2016 1:50 PM
>> *To:* Iulian Dragoș; Fernandez, Andres
>> *Cc:* user
>> *Subject:* RE: how to correctly run scala script using spark-shell
>> through stdin (spark v1.0.0)
>>
>>
>>
>> I’ve just tried running this using a normal stdin redirect:
>>
>>
>>
>> ~/spark/bin/spark-shell < simple.scala
>>
>>
>>
>> Which worked, it started spark-shell, executed the script, the stopped
>> the shell.
>>
>>
>>
>> Thanks,
>>
>> Ewan
>>
>>
>>
>> *From:* Iulian Dragoș [mailto:iulian.dra...@typesafe.com
>> ]
>> *Sent:* 26 January 2016 15:00
>> *To:* fernandrez1987 
>> *Cc:* user 
>> *Subject:* Re: how to correctly run scala script using spark-shell
>> through stdin (spark v1.0.0)
>>
>>
>>
>> I don’t see -i in the output of spark-shell --help. Moreover, in master
>> I get an error:
>>
>> $ bin/spark-shell -i test.scala
>>
>> bad option: '-i'
>>
>> iulian
>>
>> ​
>>
>>
>>
>> On Tue, Jan 26, 2016 at 3:47 PM, fernandrez1987 <
>> andres.fernan...@wellsfargo.com> wrote:
>>
>> spark-shell -i file.scala is not working for me in Spark 1.6.0, was this
>> removed or what do I have to take into account? The script does not get
>> run
>> at all. What can be happening?
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26071/script.png
>> >
>>
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26071/shell-call.png
>> >
>>
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26071/no-println.png
>> >
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-correctly-run-scala-script-using-spark-shell-through-stdin-spark-v1-0-0-tp12972p26071.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>> --
>>
>>
>> --
>> Iulian Dragos
>>
>>
>>
>> --
>> Reactive Apps on the JVM
>> www.typesafe.com
>>
>>
>>
>
>
>
> --
>
> --
> Iulian Dragos
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
>


-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Getting Exceptions/WARN during random runs for same dataset

2016-01-29 Thread Khusro Siddiqui
Hi Everyone,

Environment used: Datastax Enterprise 4.8.3 which is bundled with Spark
1.4.1 and scala 2.10.5.

I am using Dataframes to query Cassandra, do processing and store the
result back into Cassandra. The job is being submitted using spark-submit
on a cluster of 3 nodes. While doing so I get three WARN messages:

WARN  2016-01-28 19:08:18 org.apache.spark.scheduler.TaskSetManager: Lost
task 99.0 in stage 2.0 (TID 107, 10.2.1.82): java.io.InvalidClassException:
org.apache.spark.sql.types.TimestampType$; unable to create instance

Caused by: java.lang.reflect.InvocationTargetException

Caused by: java.lang.UnsupportedOperationException: tail of empty list


For example, if I am running the same job, for the same input set of data,
say 20 times,

- 11 times it will run successfully without any WARN messages

- 4 times it will run successfully with the above messages

- 6 times it will run successfully by randomly giving one or two of the
exceptions above


In all the 20 runs, the output data is coming as expected and there is no
error in that. My concern is, why is it not giving these messages every
time I do a spark-submit but only at times. Also, the stack trace does not
point to any specific point in my line of code. Full stack trace is as
follows. Please let me know if you need any other information


WARN  2016-01-28 19:08:24 org.apache.spark.scheduler.TaskSetManager: Lost
task 188.0 in stage 16.0 (TID 637, 10.2.1.82):
java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
unable to create instance

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1788)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at 

Re: Hive on Spark knobs

2016-01-29 Thread Ruslan Dautkhanov
Yep, I tried that. It seems you're right. Got an error that execution
engine has to be set to mr.

hive.execution.engine = mr

I did not keep exact error message/stack. It's probably disabled explicitly.


-- 
Ruslan Dautkhanov

On Thu, Jan 28, 2016 at 7:03 AM, Todd  wrote:

> Did you run hive on spark with spark 1.5 and hive 1.1?
> I think hive on spark doesn't support spark 1.5. There are compatibility
> issues.
>
>
> At 2016-01-28 01:51:43, "Ruslan Dautkhanov"  wrote:
>
>
> https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
>
> There are quite a lot of knobs to tune for Hive on Spark.
>
> Above page recommends following settings:
>
> mapreduce.input.fileinputformat.split.maxsize=75000
>> hive.vectorized.execution.enabled=true
>> hive.cbo.enable=true
>> hive.optimize.reducededuplication.min.reducer=4
>> hive.optimize.reducededuplication=true
>> hive.orc.splits.include.file.footer=false
>> hive.merge.mapfiles=true
>> hive.merge.sparkfiles=false
>> hive.merge.smallfiles.avgsize=1600
>> hive.merge.size.per.task=25600
>> hive.merge.orcfile.stripe.level=true
>> hive.auto.convert.join=true
>> hive.auto.convert.join.noconditionaltask=true
>> hive.auto.convert.join.noconditionaltask.size=894435328
>> hive.optimize.bucketmapjoin.sortedmerge=false
>> hive.map.aggr.hash.percentmemory=0.5
>> hive.map.aggr=true
>> hive.optimize.sort.dynamic.partition=false
>> hive.stats.autogather=true
>> hive.stats.fetch.column.stats=true
>> hive.vectorized.execution.reduce.enabled=false
>> hive.vectorized.groupby.checkinterval=4096
>> hive.vectorized.groupby.flush.percent=0.1
>> hive.compute.query.using.stats=true
>> hive.limit.pushdown.memory.usage=0.4
>> hive.optimize.index.filter=true
>> hive.exec.reducers.bytes.per.reducer=67108864
>> hive.smbjoin.cache.rows=1
>> hive.exec.orc.default.stripe.size=67108864
>> hive.fetch.task.conversion=more
>> hive.fetch.task.conversion.threshold=1073741824
>> hive.fetch.task.aggr=false
>> mapreduce.input.fileinputformat.list-status.num-threads=5
>> spark.kryo.referenceTracking=false
>>
>> spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
>
>
> Did it work for everybody? It may take days if not weeks to try to tune
> all of these parameters for a specific job.
>
> We're on Spark 1.5 / Hive 1.1.
>
>
> ps. We have a job that can't get working well as a Hive job, so thought to
> use Hive on Spark instead. (a 3-table full outer joins with group by +
> collect_list). Spark should handle this much better.
>
>
> Ruslan
>
>
>


Re: how to correctly run scala script using spark-shell through stdin (spark v1.0.0)

2016-01-29 Thread Iulian Dragoș
I found the issue in the 2.11 version of the REPL, PR will follow shortly.

The 2.10 version of Spark doesn't have this issue, so you could use that in
the mean time.

iulian

On Wed, Jan 27, 2016 at 3:17 PM,  wrote:

> So far, still cannot find a way of running a small Scala script right
> after executing the shell, and get the shell to remain open. Is there a way
> of doing this?
>
> Feels like a simple/naive question but really couldn’t find an answer.
>
>
>
> *From:* Fernandez, Andres
> *Sent:* Tuesday, January 26, 2016 2:53 PM
> *To:* 'Ewan Leith'; Iulian Dragoș
> *Cc:* user
> *Subject:* RE: how to correctly run scala script using spark-shell
> through stdin (spark v1.0.0)
>
>
>
> True thank you. Is there a way of having the shell not closed (how to
> avoid the :quit statement). Thank you both.
>
>
>
> Andres
>
>
>
> *From:* Ewan Leith [mailto:ewan.le...@realitymine.com
> ]
> *Sent:* Tuesday, January 26, 2016 1:50 PM
> *To:* Iulian Dragoș; Fernandez, Andres
> *Cc:* user
> *Subject:* RE: how to correctly run scala script using spark-shell
> through stdin (spark v1.0.0)
>
>
>
> I’ve just tried running this using a normal stdin redirect:
>
>
>
> ~/spark/bin/spark-shell < simple.scala
>
>
>
> Which worked, it started spark-shell, executed the script, the stopped the
> shell.
>
>
>
> Thanks,
>
> Ewan
>
>
>
> *From:* Iulian Dragoș [mailto:iulian.dra...@typesafe.com
> ]
> *Sent:* 26 January 2016 15:00
> *To:* fernandrez1987 
> *Cc:* user 
> *Subject:* Re: how to correctly run scala script using spark-shell
> through stdin (spark v1.0.0)
>
>
>
> I don’t see -i in the output of spark-shell --help. Moreover, in master I
> get an error:
>
> $ bin/spark-shell -i test.scala
>
> bad option: '-i'
>
> iulian
>
> ​
>
>
>
> On Tue, Jan 26, 2016 at 3:47 PM, fernandrez1987 <
> andres.fernan...@wellsfargo.com> wrote:
>
> spark-shell -i file.scala is not working for me in Spark 1.6.0, was this
> removed or what do I have to take into account? The script does not get run
> at all. What can be happening?
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26071/script.png
> >
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26071/shell-call.png
> >
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n26071/no-println.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-correctly-run-scala-script-using-spark-shell-through-stdin-spark-v1-0-0-tp12972p26071.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
> --
>
>
> --
> Iulian Dragos
>
>
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
>
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Spark Caching Kafka Metadata

2016-01-29 Thread Cody Koeninger
The kafka direct stream doesn't do any explicit caching.  I haven't looked
through the underlying simple consumer code in the kafka project in detail,
but I doubt it does either.

Honestly, I'd recommend not using auto created topics (it makes it too easy
to pollute your topics if someone fat-fingers something when interacting
with kafka), and instead explicitly creating topics before using them.

If you're trying to create the topic in your spark job right before using
it with direct stream, I can see how there might possibly be a race
condition - you're using the ZK api, but the direct stream is talking only
to the broker api.

On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv  wrote:

> Does Spark cache which kafka topics exist? A service incorrectly assumes
> all the relevant topics exist, even if they are empty, causing it to fail.
> Fortunately the service is automatically restarted and by default, kafka
> creates the topic after it is requested.
>
> I'm trying to create the topic if it doesn't exist using
> AdminUtils.createTopic:
>
>   val zkClient = new ZkClient("localhost:2181", 1, 1,
> ZKStringSerializer)
>   while (!AdminUtils.topicExists(zkClient, topic)) {
> AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
>   }
>
> But I still get an Error getting partition metadata for 'topic-name'.
> Does the topic exist? when I execute KafkaUtils.createDirectStream
>
> I've also tried to implement a retry with a wait such that the retry
> should occur after Kafka has created the requested topic with 
> auto.create.topics.enable
> = true, but this still doesn't work consistently.
>
> This is a bit frustrating to debug as well since the topic is successfully
> created about 50% of the time, other times I get message "Does the topic
> exist?". My thinking is that Spark may be caching the list of extant kafka
> topics, ignoring that I've added a new one. Is this the case? Am I missing
> something?
>
>
> Ben
>


Re: Spark Streaming from existing RDD

2016-01-29 Thread Shixiong(Ryan) Zhu
Do you just want to write some unit tests? If so, you can use "queueStream"
to create a DStream from a queue of RDDs. However, because it doesn't
support metadata checkpointing, it's better to only use it in unit tests.

On Fri, Jan 29, 2016 at 7:35 AM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> Anyone please  help me out how to create a DStream from existing RDD. My
> code is:
>
> JavaSparkContext ctx = new JavaSparkContext(conf);JavaRDD rddd = 
> ctx.parallelize(arraylist);
>
> Now i need to use these *rddd* as input to *JavaStreamingContext*.
>


Re: Broadcast join on multiple dataframes

2016-01-29 Thread Srikanth
Micheal,

Output of DF.queryExecution is saved to
https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0
I don't see anything in this to suggest a switch in strategy. Hopefully you
find this helpful.

Srikanth

On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust 
wrote:

> Can you provide the analyzed and optimized plans (explain(true))
>
> On Thu, Jan 28, 2016 at 12:26 PM, Srikanth  wrote:
>
>> Hello,
>>
>> I have a use case where one large table has to be joined with several
>> smaller tables.
>> I've added broadcast hint for all small tables in the joins.
>>
>> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>> val metaActionDF = sqlContext.read.format("json")
>> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
>> val metaLocationDF =
>> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>>.join(broadcast(metaActionDF),
>> "campaign_id")
>>.join(broadcast(cidOrgDF),
>> List("organization_id"), "left_outer")
>>
>> val metaCreativeDF = sqlContext.read.format("json")
>> val metaExchangeDF = sqlContext.read.format("json")
>> val localizationDF =
>> sqlContext.read.format("com.databricks.spark.csv")
>> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>> val joinedBidderDF = largeTableDF.as("BID")
>> .join(broadcast(metaLocationDF),
>> "strategy_id")
>> .join(broadcast(metaCreativeDF),
>> "creative_id")
>> .join(broadcast(metaExchangeDF),
>> $"exchange_id" === $"id" , "left_outer")
>> .join(broadcast(techKeyDF).as("TK"),
>> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
>> .join(broadcast(localizationDF).as("BL"),
>> $"BID.language" === $"BL.id" , "left_outer")
>>
>> When I look at the execution plan, all the joins are marked as
>> broadcastjoin.
>> But when I look at the spark job UI, the DAG visualization shows that
>> some joins are sortmerged with shuffle involved.
>> The ones that I've highlighted in yellow were shuffled.
>> DAG can be viewed here -
>> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>>
>> Why is the actual execution as seen in the DAG different from the
>> physical plan pasted below.
>> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>>
>> == Physical Plan ==
>>
>> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>>
>> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>>
>> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
>> LeftOuter, None
>>
>> :  :  :- Project [...]
>>
>> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
>> [creative_id#131L], BuildRight
>>
>> :  :  : :- Project [...]
>>
>> :  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
>> [strategy_id#127L], BuildRight
>>
>> :  :  : : :- ConvertToUnsafe
>>
>> :  :  : : :  +- Scan
>> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,
>>
>> :  :  : : +- Project [...]
>>
>> :  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
>> [cast(organization_id#102 as bigint)], LeftOuter, None
>>
>> :  :  : :   :- Project [...]
>>
>> :  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
>> [campaign_id#75L], BuildRight
>>
>> :  :  : :   : :- Project [...]
>>
>> :  :  : :   : :  +- Scan
>> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>>
>> :  :  : :   : +- Scan JSONRelation[] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>>
>> :  :  : :   +- ConvertToUnsafe
>>
>> :  :  : :  +- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>>
>> :  :  : +- Scan
>> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
>> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>>
>> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>>
>> :  +- ConvertToUnsafe
>>
>> : +- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>>
>>
>> +- ConvertToUnsafe
>>
>>+- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>>
>>
>>
>> Srikanth
>>
>
>


Re: Spark, Mesos, Docker and S3

2016-01-29 Thread Mao Geng
Sathish,

The constraint you described is Marathon's, not Mesos's :)

Spark.mesos.constraints are applied to slave attributes like tachyon=true
;us-east-1=false, as described in
https://issues.apache.org/jira/browse/SPARK-6707.

Cheers,
-Mao

On Fri, Jan 29, 2016 at 2:51 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hi
>
> Quick question. How to pass constraint [["hostname", "CLUSTER", "
> specific.node.com"]] to mesos?
>
> I was trying --conf spark.mesos.constraints=hostname:specific.node.com.
> But it didn't seems working
>
>
> Please help
>
>
> Thanks
>
> Sathish
>
> On Thu, Jan 28, 2016 at 6:52 PM Mao Geng  wrote:
>
>> From my limited knowledge, only limited options such as network mode,
>> volumes, portmaps can be passed through. See
>> https://github.com/apache/spark/pull/3074/files.
>>
>> https://issues.apache.org/jira/browse/SPARK-8734 is open for exposing
>> all docker options to spark.
>>
>> -Mao
>>
>> On Thu, Jan 28, 2016 at 1:55 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> Thank you., I figured it out. I have set executor memory to minimal and
>>> it works.,
>>>
>>> Another issue has come.. I have to pass --add-host option while running
>>> containers in slave nodes.. Is there any option to pass docker run
>>> parameters from spark?
>>> On Thu, Jan 28, 2016 at 12:26 PM Mao Geng  wrote:
>>>
 Sathish,

 I guess the mesos resources are not enough to run your job. You might
 want to check the mesos log to figure out why.

 I tried to run the docker image with "--conf spark.mesos.coarse=false"
 and "true". Both are fine.

 Best,
 Mao

 On Wed, Jan 27, 2016 at 5:00 PM, Sathish Kumaran Vairavelu <
 vsathishkuma...@gmail.com> wrote:

> Hi,
>
> On the same Spark/Mesos/Docker setup, I am getting warning "Initial
> Job has not accepted any resources; check your cluster UI to ensure that
> workers are registered and have sufficient resources". I am running in
> coarse grained mode. Any pointers on how to fix this issue? Please help. I
> have updated both docker.properties and spark-default.conf with  
> spark.mesos.executor.docker.image
> and other properties.
>
>
> Thanks
>
> Sathish
>
> On Wed, Jan 27, 2016 at 9:58 AM Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Thanks a lot for your info! I will try this today.
>> On Wed, Jan 27, 2016 at 9:29 AM Mao Geng  wrote:
>>
>>> Hi Sathish,
>>>
>>> The docker image is normal, no AWS profile included.
>>>
>>> When the driver container runs with --net=host, the driver host's
>>> AWS profile will take effect so that the driver can access the 
>>> protected s3
>>> files.
>>>
>>> Similarly,  Mesos slaves also run Spark executor docker container in
>>> --net=host mode, so that the AWS profile of Mesos slaves will take 
>>> effect.
>>>
>>> Hope it helps,
>>> Mao
>>>
>>> On Jan 26, 2016, at 9:15 PM, Sathish Kumaran Vairavelu <
>>> vsathishkuma...@gmail.com> wrote:
>>>
>>> Hi Mao,
>>>
>>> I want to check on accessing the S3 from Spark docker in Mesos.  The
>>> EC2 instance that I am using has the AWS profile/IAM included.  Should 
>>> we
>>> build the docker image with any AWS profile settings or --net=host 
>>> docker
>>> option takes care of it?
>>>
>>> Please help
>>>
>>>
>>> Thanks
>>>
>>> Sathish
>>>
>>> On Tue, Jan 26, 2016 at 9:04 PM Mao Geng  wrote:
>>>
 Thank you very much, Jerry!

 I changed to "--jars
 /opt/spark/lib/hadoop-aws-2.7.1.jar,/opt/spark/lib/aws-java-sdk-1.7.4.jar"
 then it worked like a charm!

 From Mesos task logs below, I saw Mesos executor downloaded the
 jars from the driver, which is a bit unnecessary (as the docker image
 already has them), but that's ok - I am happy seeing Spark + Mesos + 
 Docker
 + S3 worked together!

 Thanks,
 Mao

 16/01/27 02:54:45 INFO Executor: Using REPL class URI: 
 http://172.16.3.98:33771
 16/01/27 02:55:12 INFO CoarseGrainedExecutorBackend: Got assigned task  0
 16/01/27 02:55:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
 16/01/27 02:55:12 INFO Executor: Fetching 
 http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar with timestamp 
 1453863280432
 16/01/27 02:55:12 INFO Utils: Fetching 
 http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar to 
 /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp1518118694295619525.tmp
 16/01/27 02:55:12 INFO Utils: Copying 
 /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/-19880839621453863280432_cache

Re: Reading lzo+index with spark-csv (Splittable reads)

2016-01-29 Thread syepes
Well looking at the src it look like its not implemented:

https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/TextFile.scala#L34-L36





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-lzo-index-with-spark-csv-Splittable-reads-tp26103p26105.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reading multiple avro files from a dir - Spark 1.5.1

2016-01-29 Thread Ajinkya Kale
Trying to load avro from hdfs. I have around 1000 part avro files in a dir.
I am using this to read them -

 val df =
sqlContext.read.format("com.databricks.spark.avro").load("path/to/avro/dir")
 df.select("QUERY").take(50).foreach(println)

It works if I have pass only 1or 2 avro files in the path. But if I pass a
dir with 400+ files I get this error. Each avro is around 300mb.

org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:275)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at
com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4$$anon$1.advanceNextRecord(AvroRelation.scala:157)
at
com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4$$anon$1.hasNext(AvroRelation.scala:166)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:905)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:905)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:776)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.avro.mapred.FsInput.read(FsInput.java:46)
at
org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
at
org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839)
at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444)
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:261)
... 36 more


Re: stopping spark stream app

2016-01-29 Thread agateaaa
Hi,

We recently started working on trying to use spark streaming to fetch and
process data from kafka. (Direct Streaming, Not Receiver based Spark 1.5.2)
We want to be able to stop the streaming application and tried implementing
the approach suggested above, using stopping thread and calling
ssc.stop(True,True) so that we dont lose any data that is being processed.
We are seeing that the spark application tries to shutdown but never exits.

16/01/29 18:14:47 INFO scheduler.JobGenerator: Stopping JobGenerator
gracefully
16/01/29 18:14:47 INFO scheduler.JobGenerator: Waiting for all received
blocks to be consumed for job generation
16/01/29 18:14:47 INFO scheduler.JobGenerator: Waited for all received
blocks to be consumed for job generation

[...]

92.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 6
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on 192.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on 192.168.10.5:46085 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 5
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 192.168.10.4:38821 in memory (size: 5.6 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 192.168.10.4:41701 in memory (size: 5.6 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 4
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0
on 192.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0
on 192.168.10.4:41701 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 3
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0
on 192.168.10.4:38821 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0
on 192.168.10.4:41701 in memory (size: 3.3 KB, free: 530.3 MB)
16/01/29 18:20:11 INFO spark.ContextCleaner: Cleaned accumulator 2

At this point we had to kill the spark app and kill the driver and
spark-submit process.

Will appreciate if anyone can give any pointers on how to shutdown a
streaming app

Thanks
Agatea



On Wed, Aug 12, 2015 at 7:54 PM, Tathagata Das  wrote:

> stop() is a blocking method when stopGraceful is set to true. In that
> case, it obviously waits for all batches with data to complete processing.
> Why are you joining on the thread in streaming listener? The listener is
> just a callback listener and is NOT supposed to do any long running
> blocking stuff.
> If you intention is that you will call stop() just in
> listener.onBatchCompleted to prevent the next batch from starting, that is
> WRONG. The listener is issued callbacks asynchronous to the processing loop
> of the context.
> As I said earlier, the ssc.stop() does not need to be (and in fact, most
> cases, should not be) called from the listener. It should be called from
> some other thread. If you have to make sure that the main program waits for
> stop to complete (especially in the case of graceful stop), then make the
> main program thread wait for stopping-thread.join(). Under no circumstances
> should you do blocking calls in the listener events.
>
> On Wed, Aug 12, 2015 at 12:13 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> does streamingcontext.stop() is a blocking method? I mean does it wait
>> for all the batches completion and complete of all streaminglisteners .
>> Since it may happen in new thread by the time sc.stop() is called a new
>> batch is already started beacause of race condition.So it will wait for new
>> batch completion also.
>>
>> I was actually joining the streaming listener to new thread which caused
>> the deadlock - since sc.stop() is blocking and it wait for all streaming
>> listeners to complete also - right?
>>
>> On Thu, Aug 13, 2015 at 12:33 AM, Tathagata Das 
>> wrote:
>>
>>> Well, system.exit will not ensure all data was processed before
>>> shutdown.
>>> There should not be a deadlock is onBatchCompleted just starts the
>>> thread (that runs stop()) and completes.
>>>
>>> On Wed, Aug 12, 2015 at 1:50 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 calling jssc.stop(false/true,false/true) from streamingListener causes
 deadlock , So I created another thread and called jssc.stop from  that but
 that too caused deadlock if onBatchCompleted is not completed before
 jssc.stop().

 So is it safe If I call System.exit(1) from another thread without
 calling jssc.stop()- since that leads to deadlock.


 On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora <
 shushantaror...@gmail.com> 

Re: How to use DStream reparation() ?

2016-01-29 Thread Andy Davidson
The following code seems to do what I want. I repartition on RDD not
DStreams. I wonder if this has to do with the way windows work?

   private static void saveTweetsCSV(JavaSparkContext jsc,
JavaDStream tidy, String outputURI) {

tidy.foreachRDD(new VoidFunction2, Time> () {

private static final long serialVersionUID = 1L;

// typically we use the CSV file format for data a human needs
to work with

// We want to repartition the data so that we write the smallest
number

// of files possible how ever the max number of rows in a given
csv

// file is small enough for a human to work with easily.

final long maxNumRowsPerFile = 100;



@Override

public void call(JavaRDD rdd, Time time) throws
Exception {

long count = rdd.count();

//if(!rdd.isEmpty()) {

if (count > 0) {

long numPartisions = count / maxNumRowsPerFile + 1;

Long tmp = numPartisions;

rdd = rdd.repartition(tmp.intValue());

String dirPath = outputURI + "_CSV" + "-" +
time.milliseconds();

//
http://spark.apache.org/docs/latest/streaming-programming-guide.html#datafra
me-and-sql-operations

// Get the singleton instance of SQLContext

SQLContext sqlContext =
SQLContext.getOrCreate(rdd.context());

   

DataFrame df = sqlContext.createDataFrame(rdd,
TidyTwitterMLPojo.class);

TidyPojo.saveCSV(df, dirPath);

}  

}

});

}


From:  Andrew Davidson 
Date:  Friday, January 29, 2016 at 1:54 PM
To:  "user @spark" 
Subject:  How to use DStream reparation() ?

> My Streaming app has a requirement that my output be saved in the smallest
> number of file possible such that each file does not exceed a max number of
> rows. Based on my experience it appears that each partition will be written to
> separate output file.
> 
> This was really easy to do in my batch processing using data frames and RDD.
> Its easy to call count() and then decide how many partitions I want and
> finally call repartition().
> 
> I am having heck of time trying to figure out to do the same thing using spark
> streaming. 
> 
> 
> JavaDStream tidy = Š
> 
> JavaDStream counts = tidy.count();
> 
> 
> 
> Bellow is the documentation for count. I do not see how I can use this to
> figure out how many partitions I need? Stream does not provide a collect().
> foreachRDD() can not return a value. I tried using an accumulator but that did
> not work
> 
> 
> 
> Any suggestions would be greatly appreciated
> 
> 
> http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/strea
> ming/api/java/JavaDStream.html
> count
> JavaDStream 
>  ava/JavaDStream.html>  count()
> Return a new DStream in which each RDD has a single element generated by
> counting each RDD of this DStream.
> Returns:(undocumented)
> 




Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Fixed a typo in the code to avoid any confusion Please comment on the
code below...

dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
 public SomeClass initialValue() { return new SomeClass(); }
};
somefunc(p, d.get());
d.remove();
return p;
}; );

On Fri, Jan 29, 2016 at 4:32 PM, N B  wrote:

> So this use of ThreadLocal will be inside the code of a function executing
> on the workers i.e. within a call from one of the lambdas. Would it just
> look like this then:
>
> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>  public SomeClass initialValue() { return new SomeClass(); }
> };
> somefunc(p, d.get());
> d.remove();
> return p;
> }; );
>
> Will this make sure that all threads inside the worker clean up the
> ThreadLocal once they are done with processing this task?
>
> Thanks
> NB
>
>
> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Spark Streaming uses threadpools so you need to remove ThreadLocal when
>> it's not used.
>>
>> On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:
>>
>>> Thanks for the response Ryan. So I would say that it is in fact the
>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
>>> thread lives. I guess my concern is around usage of threadpools and whether
>>> Spark streaming will internally create many threads that rotate between
>>> tasks on purpose thereby holding onto ThreadLocals that may actually never
>>> be used again.
>>>
>>> Thanks
>>>
>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 Of cause. If you use a ThreadLocal in a long living thread and forget
 to remove it, it's definitely a memory leak.

 On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:

> Hello,
>
> Does anyone know if there are any potential pitfalls associated with
> using ThreadLocal variables in a Spark streaming application? One things I
> have seen mentioned in the context of app servers that use thread pools is
> that ThreadLocals can leak memory. Could this happen in Spark streaming
> also?
>
> Thanks
> Nikunj
>
>

>>>
>>
>


How to use DStream reparation() ?

2016-01-29 Thread Andy Davidson
My Streaming app has a requirement that my output be saved in the smallest
number of file possible such that each file does not exceed a max number of
rows. Based on my experience it appears that each partition will be written
to separate output file.

This was really easy to do in my batch processing using data frames and RDD.
Its easy to call count() and then decide how many partitions I want and
finally call repartition().

I am having heck of time trying to figure out to do the same thing using
spark streaming. 


JavaDStream tidy = Š

JavaDStream counts = tidy.count();



Bellow is the documentation for count. I do not see how I can use this to
figure out how many partitions I need? Stream does not provide a collect().
foreachRDD() can not return a value. I tried using an accumulator but that
did not work



Any suggestions would be greatly appreciated


http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/str
eaming/api/java/JavaDStream.html
count
JavaDStream 
  count()
Return a new DStream in which each RDD has a single element generated by
counting each RDD of this DStream.
Returns:(undocumented)





Garbage collections issue on MapPartitions

2016-01-29 Thread rcollich
Hi all,

I currently have a mapPartitions job which is flatMapping each value in the
iterator, and I'm running into an issue where there will be major GC costs
on certain executions. Some executors will take 20 minutes, 15 of which are
pure garbage collection, and I believe that a lot of it has to do with the
ArrayBuffer that I am outputting. Does anyone have any suggestions as to how
I can do some form of a stream output?

Also, does anyone have any advice in general for tracking down/addressing GC
issues in spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Garbage-collections-issue-on-MapPartitions-tp26104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GoogleAnalytics GAData

2016-01-29 Thread Andrés Ivaldi
Hello , Im using Google api to retrive google analytics JSON

I'd like to use Spark to load the JSON, but toString truncates the value, I
could save it to disk and then retrive it, butI'm loosing performance, is
there any other way?

Regars



-- 
Ing. Ivaldi Andres


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
So this use of ThreadLocal will be inside the code of a function executing
on the workers i.e. within a call from one of the lambdas. Would it just
look like this then:

dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
 public SomeClass initialValue() { return new SomeClass(); }
};
somefunc(p, d.get());
d.remove();
return p;
}; );

Will this make sure that all threads inside the worker clean up the
ThreadLocal once they are done with processing this task?

Thanks
NB


On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu  wrote:

> Spark Streaming uses threadpools so you need to remove ThreadLocal when
> it's not used.
>
> On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:
>
>> Thanks for the response Ryan. So I would say that it is in fact the
>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
>> thread lives. I guess my concern is around usage of threadpools and whether
>> Spark streaming will internally create many threads that rotate between
>> tasks on purpose thereby holding onto ThreadLocals that may actually never
>> be used again.
>>
>> Thanks
>>
>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Of cause. If you use a ThreadLocal in a long living thread and forget to
>>> remove it, it's definitely a memory leak.
>>>
>>> On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:
>>>
 Hello,

 Does anyone know if there are any potential pitfalls associated with
 using ThreadLocal variables in a Spark streaming application? One things I
 have seen mentioned in the context of app servers that use thread pools is
 that ThreadLocals can leak memory. Could this happen in Spark streaming
 also?

 Thanks
 Nikunj


>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Well won't the code in lambda execute inside multiple threads in the worker
because it has to process many records? I would just want to have a single
copy of SomeClass instantiated per thread rather than once per each record
being processed. That was what triggered this thought anyways.

Thanks
NB


On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu  wrote:

> It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
> You don't need to use ThreadLocal if there are no multiple threads in your
> codes.
>
> On Fri, Jan 29, 2016 at 4:39 PM, N B  wrote:
>
>> Fixed a typo in the code to avoid any confusion Please comment on the
>> code below...
>>
>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>  public SomeClass initialValue() { return new SomeClass(); }
>> };
>> somefunc(p, d.get());
>> d.remove();
>> return p;
>> }; );
>>
>> On Fri, Jan 29, 2016 at 4:32 PM, N B  wrote:
>>
>>> So this use of ThreadLocal will be inside the code of a function
>>> executing on the workers i.e. within a call from one of the lambdas. Would
>>> it just look like this then:
>>>
>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>  public SomeClass initialValue() { return new SomeClass(); }
>>> };
>>> somefunc(p, d.get());
>>> d.remove();
>>> return p;
>>> }; );
>>>
>>> Will this make sure that all threads inside the worker clean up the
>>> ThreadLocal once they are done with processing this task?
>>>
>>> Thanks
>>> NB
>>>
>>>
>>> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 Spark Streaming uses threadpools so you need to remove ThreadLocal when
 it's not used.

 On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:

> Thanks for the response Ryan. So I would say that it is in fact the
> purpose of a ThreadLocal i.e. to have a copy of the variable as long as 
> the
> thread lives. I guess my concern is around usage of threadpools and 
> whether
> Spark streaming will internally create many threads that rotate between
> tasks on purpose thereby holding onto ThreadLocals that may actually never
> be used again.
>
> Thanks
>
> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Of cause. If you use a ThreadLocal in a long living thread and forget
>> to remove it, it's definitely a memory leak.
>>
>> On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:
>>
>>> Hello,
>>>
>>> Does anyone know if there are any potential pitfalls associated with
>>> using ThreadLocal variables in a Spark streaming application? One 
>>> things I
>>> have seen mentioned in the context of app servers that use thread pools 
>>> is
>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>> also?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>
>

>>>
>>
>


Re: Spark, Mesos, Docker and S3

2016-01-29 Thread Sathish Kumaran Vairavelu
Hi

Quick question. How to pass constraint [["hostname", "CLUSTER", "
specific.node.com"]] to mesos?

I was trying --conf spark.mesos.constraints=hostname:specific.node.com. But
it didn't seems working


Please help


Thanks

Sathish
On Thu, Jan 28, 2016 at 6:52 PM Mao Geng  wrote:

> From my limited knowledge, only limited options such as network mode,
> volumes, portmaps can be passed through. See
> https://github.com/apache/spark/pull/3074/files.
>
> https://issues.apache.org/jira/browse/SPARK-8734 is open for exposing all
> docker options to spark.
>
> -Mao
>
> On Thu, Jan 28, 2016 at 1:55 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Thank you., I figured it out. I have set executor memory to minimal and
>> it works.,
>>
>> Another issue has come.. I have to pass --add-host option while running
>> containers in slave nodes.. Is there any option to pass docker run
>> parameters from spark?
>> On Thu, Jan 28, 2016 at 12:26 PM Mao Geng  wrote:
>>
>>> Sathish,
>>>
>>> I guess the mesos resources are not enough to run your job. You might
>>> want to check the mesos log to figure out why.
>>>
>>> I tried to run the docker image with "--conf spark.mesos.coarse=false"
>>> and "true". Both are fine.
>>>
>>> Best,
>>> Mao
>>>
>>> On Wed, Jan 27, 2016 at 5:00 PM, Sathish Kumaran Vairavelu <
>>> vsathishkuma...@gmail.com> wrote:
>>>
 Hi,

 On the same Spark/Mesos/Docker setup, I am getting warning "Initial Job
 has not accepted any resources; check your cluster UI to ensure that
 workers are registered and have sufficient resources". I am running in
 coarse grained mode. Any pointers on how to fix this issue? Please help. I
 have updated both docker.properties and spark-default.conf with  
 spark.mesos.executor.docker.image
 and other properties.


 Thanks

 Sathish

 On Wed, Jan 27, 2016 at 9:58 AM Sathish Kumaran Vairavelu <
 vsathishkuma...@gmail.com> wrote:

> Thanks a lot for your info! I will try this today.
> On Wed, Jan 27, 2016 at 9:29 AM Mao Geng  wrote:
>
>> Hi Sathish,
>>
>> The docker image is normal, no AWS profile included.
>>
>> When the driver container runs with --net=host, the driver host's AWS
>> profile will take effect so that the driver can access the protected s3
>> files.
>>
>> Similarly,  Mesos slaves also run Spark executor docker container in
>> --net=host mode, so that the AWS profile of Mesos slaves will take 
>> effect.
>>
>> Hope it helps,
>> Mao
>>
>> On Jan 26, 2016, at 9:15 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>> Hi Mao,
>>
>> I want to check on accessing the S3 from Spark docker in Mesos.  The
>> EC2 instance that I am using has the AWS profile/IAM included.  Should we
>> build the docker image with any AWS profile settings or --net=host docker
>> option takes care of it?
>>
>> Please help
>>
>>
>> Thanks
>>
>> Sathish
>>
>> On Tue, Jan 26, 2016 at 9:04 PM Mao Geng  wrote:
>>
>>> Thank you very much, Jerry!
>>>
>>> I changed to "--jars
>>> /opt/spark/lib/hadoop-aws-2.7.1.jar,/opt/spark/lib/aws-java-sdk-1.7.4.jar"
>>> then it worked like a charm!
>>>
>>> From Mesos task logs below, I saw Mesos executor downloaded the jars
>>> from the driver, which is a bit unnecessary (as the docker image already
>>> has them), but that's ok - I am happy seeing Spark + Mesos + Docker + S3
>>> worked together!
>>>
>>> Thanks,
>>> Mao
>>>
>>> 16/01/27 02:54:45 INFO Executor: Using REPL class URI: 
>>> http://172.16.3.98:33771
>>> 16/01/27 02:55:12 INFO CoarseGrainedExecutorBackend: Got assigned task 0
>>> 16/01/27 02:55:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
>>> 16/01/27 02:55:12 INFO Executor: Fetching 
>>> http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar with timestamp 
>>> 1453863280432
>>> 16/01/27 02:55:12 INFO Utils: Fetching 
>>> http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar to 
>>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp1518118694295619525.tmp
>>> 16/01/27 02:55:12 INFO Utils: Copying 
>>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/-19880839621453863280432_cache
>>>  to /./hadoop-aws-2.7.1.jar
>>> 16/01/27 02:55:12 INFO Executor: Adding file:/./hadoop-aws-2.7.1.jar to 
>>> class loader
>>> 16/01/27 02:55:12 INFO Executor: Fetching 
>>> http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar with timestamp 
>>> 1453863280472
>>> 16/01/27 02:55:12 INFO Utils: Fetching 
>>> http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar to 
>>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp8868621397726761921.tmp
>>> 16/01/27 02:55:12 INFO Utils: 

saveAsTextFile is not writing to local fs

2016-01-29 Thread Siva
Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing
_SUCCESS file in the folder with no part- files and also no error or
warning messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.


Reading lzo+index with spark-csv (Splittable reads)

2016-01-29 Thread syepes
Hello,
​
I have managed to speed up the read stage when loading CSV files using the
classic "newAPIHadoopFile" method, the issue is that I would like to use the
spark-csv package and it seams that its not taking into consideration the
LZO Index file / Splittable reads.

/# Using the classic method the read is fully parallelized (Splittable)/
sc.newAPIHadoopFile("/user/sy/data.csv.lzo",  ).count

/# When spark-csv is used the file is read only from one node (No Splittable
reads)/
sqlContext.read.format("com.databricks.spark.csv").options(Map("path" ->
"/user/sy/data.csv.lzo", "header" -> "true", "inferSchema" ->
"false")).load().count()

Does anyone know if this is currently supported?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-lzo-index-with-spark-csv-Splittable-reads-tp26103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[MLlib] What is the best way to forecast the next month page visit?

2016-01-29 Thread diplomatic Guru
Hello guys,

I'm trying understand how I could predict the next month page views based
on the previous access pattern.

For example, I've collected statistics on page views:

e.g.
Page,UniqueView
-
pageA, 1
pageB, 999
...
pageZ,200

I aggregate the statistics monthly.

I've prepared a file containing last 3 months as this:

e.g.
Page,UV_NOV, UV_DEC, UV_JAN
---
pageA, 1,9989,11000
pageB, 999,500,700
...
pageZ,200,50,34


Based on above information, I want to predict the next month (FEB).

Which alogrithm do you think will suit most, I think linear regression is
the safe bet. However, I'm struggling to prepare this data for LR ML,
especially how do I prepare the X,Y relationship.

The Y is easy (uniqiue visitors), but not sure about the X(it should be
Page,right). However, how do I plot those three months of data.

Could you give me an example based on above example data?



Page,UV_NOV, UV_DEC, UV_JAN
---
1, 1,9989,11000
2, 999,500,700
...
26,200,50,34


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
You don't need to use ThreadLocal if there are no multiple threads in your
codes.

On Fri, Jan 29, 2016 at 4:39 PM, N B  wrote:

> Fixed a typo in the code to avoid any confusion Please comment on the
> code below...
>
> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>  public SomeClass initialValue() { return new SomeClass(); }
> };
> somefunc(p, d.get());
> d.remove();
> return p;
> }; );
>
> On Fri, Jan 29, 2016 at 4:32 PM, N B  wrote:
>
>> So this use of ThreadLocal will be inside the code of a function
>> executing on the workers i.e. within a call from one of the lambdas. Would
>> it just look like this then:
>>
>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>  public SomeClass initialValue() { return new SomeClass(); }
>> };
>> somefunc(p, d.get());
>> d.remove();
>> return p;
>> }; );
>>
>> Will this make sure that all threads inside the worker clean up the
>> ThreadLocal once they are done with processing this task?
>>
>> Thanks
>> NB
>>
>>
>> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Spark Streaming uses threadpools so you need to remove ThreadLocal when
>>> it's not used.
>>>
>>> On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:
>>>
 Thanks for the response Ryan. So I would say that it is in fact the
 purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
 thread lives. I guess my concern is around usage of threadpools and whether
 Spark streaming will internally create many threads that rotate between
 tasks on purpose thereby holding onto ThreadLocals that may actually never
 be used again.

 Thanks

 On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> Of cause. If you use a ThreadLocal in a long living thread and forget
> to remove it, it's definitely a memory leak.
>
> On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:
>
>> Hello,
>>
>> Does anyone know if there are any potential pitfalls associated with
>> using ThreadLocal variables in a Spark streaming application? One things 
>> I
>> have seen mentioned in the context of app servers that use thread pools 
>> is
>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>> also?
>>
>> Thanks
>> Nikunj
>>
>>
>

>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
I see. Then you should use `mapPartitions` rather than using ThreadLocal.
E.g.,

dstream.mapPartitions( iter ->
val d = new SomeClass();
return iter.map { p =>
   somefunc(p, d.get())
};
}; );


On Fri, Jan 29, 2016 at 5:29 PM, N B  wrote:

> Well won't the code in lambda execute inside multiple threads in the
> worker because it has to process many records? I would just want to have a
> single copy of SomeClass instantiated per thread rather than once per each
> record being processed. That was what triggered this thought anyways.
>
> Thanks
> NB
>
>
> On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
>> You don't need to use ThreadLocal if there are no multiple threads in your
>> codes.
>>
>> On Fri, Jan 29, 2016 at 4:39 PM, N B  wrote:
>>
>>> Fixed a typo in the code to avoid any confusion Please comment on
>>> the code below...
>>>
>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>  public SomeClass initialValue() { return new SomeClass(); }
>>> };
>>> somefunc(p, d.get());
>>> d.remove();
>>> return p;
>>> }; );
>>>
>>> On Fri, Jan 29, 2016 at 4:32 PM, N B  wrote:
>>>
 So this use of ThreadLocal will be inside the code of a function
 executing on the workers i.e. within a call from one of the lambdas. Would
 it just look like this then:

 dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
  public SomeClass initialValue() { return new SomeClass(); }
 };
 somefunc(p, d.get());
 d.remove();
 return p;
 }; );

 Will this make sure that all threads inside the worker clean up the
 ThreadLocal once they are done with processing this task?

 Thanks
 NB


 On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
 shixi...@databricks.com> wrote:

> Spark Streaming uses threadpools so you need to remove ThreadLocal
> when it's not used.
>
> On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:
>
>> Thanks for the response Ryan. So I would say that it is in fact the
>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as 
>> the
>> thread lives. I guess my concern is around usage of threadpools and 
>> whether
>> Spark streaming will internally create many threads that rotate between
>> tasks on purpose thereby holding onto ThreadLocals that may actually 
>> never
>> be used again.
>>
>> Thanks
>>
>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Of cause. If you use a ThreadLocal in a long living thread and
>>> forget to remove it, it's definitely a memory leak.
>>>
>>> On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:
>>>
 Hello,

 Does anyone know if there are any potential pitfalls associated
 with using ThreadLocal variables in a Spark streaming application? One
 things I have seen mentioned in the context of app servers that use 
 thread
 pools is that ThreadLocals can leak memory. Could this happen in Spark
 streaming also?

 Thanks
 Nikunj


>>>
>>
>

>>>
>>
>


RE: saveAsTextFile is not writing to local fs

2016-01-29 Thread Mohammed Guller
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing to local fs

Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs 
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing 
_SUCCESS file in the folder with no part- files and also no error or warning 
messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.


Re: saveAsTextFile is not writing to local fs

2016-01-29 Thread Siva
Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG
mode. I see the below exception, but this exception occurred after
saveAsTextfile function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at
org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at
org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
wrote:

> Is it a multi-node cluster or you running Spark on a single machine?
>
>
>
> You can change Spark’s logging level to INFO or DEBUG to see what is going
> on.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 3:38 PM
> *To:* spark users
> *Subject:* saveAsTextFile is not writing to local fs
>
>
>
> Hi Everyone,
>
>
>
> We are using spark 1.4.1 and we have a requirement of writing data local
> fs instead of hdfs.
>
>
>
> When trying to save rdd to local fs with saveAsTextFile, it is just
> writing _SUCCESS file in the folder with no part- files and also no error
> or warning messages on console.
>
>
>
> Is there any place to look at to fix this problem?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>


mapWithState: multiple operations on the same stream

2016-01-29 Thread Udo Fholl
Hi,

I have a stream which I need to process events and send them to another
service and then remove the key from the state. I'm storing state because I
some events are delayed.

My current approach is to consolidate the state, store it with a
mapWithState invocation. Then rather than using a foreachRDD to send it I
use a transform invocation. I do that in order to mark the event as failed
or successfully sent. The service that I'm sending data to, is quite faulty.

What would be the best approach for this workflow:
* Obtain data
* Consolidate state
* Try to send
* If it succeeds remove from state otherwise keep it in the state

Thanks.

Best regards,
Udo.


Re: mapWithState: remove key

2016-01-29 Thread Shixiong(Ryan) Zhu
1. To remove a state, you need to call "state.remove()". If you return a
None in the function, it just means don't output it as the DStream's
output, but the state won't be removed if you don't call "state.remove()".

2. For NoSuchElementException, here is the doc for "State.get":

  /**
   * Get the state if it exists, otherwise it will throw
`java.util.NoSuchElementException`.
   * Check with `exists()` whether the state exists or not before calling
`get()`.
   *
   * @throws java.util.NoSuchElementException If the state does not exist.
   */




On Fri, Jan 29, 2016 at 10:45 AM, Udo Fholl  wrote:

> Hi,
>
> From the signature of the "mapWithState" method I infer that by returning
> a "None.type" (in Scala) the key is removed from the state. Is that so?
> Sorry if it is in the docs, but it wasn't entirely clear to me.
>
> I'm chaining operations and calling "mapWithState" twice (one to
> consolidate, then I perform some operations that might, or might not
> succeed, and invoke "mapWithState" again). I'm getting this error[1] which
> I suppose is because I'm returning "None" in the "mapWithState" function.
>
> Thank you.
>
> Best regards,
> Udo.
>
> [1]: java.util.NoSuchElementException: State is not set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>


mapWithState: remove key

2016-01-29 Thread Udo Fholl
Hi,

>From the signature of the "mapWithState" method I infer that by returning a
"None.type" (in Scala) the key is removed from the state. Is that so? Sorry
if it is in the docs, but it wasn't entirely clear to me.

I'm chaining operations and calling "mapWithState" twice (one to
consolidate, then I perform some operations that might, or might not
succeed, and invoke "mapWithState" again). I'm getting this error[1] which
I suppose is because I'm returning "None" in the "mapWithState" function.

Thank you.

Best regards,
Udo.

[1]: java.util.NoSuchElementException: State is not set
at org.apache.spark.streaming.StateImpl.get(State.scala:150)


Re: Spark 2.0.0 release plan

2016-01-29 Thread Mark Hamstra
https://github.com/apache/spark/pull/10608

On Fri, Jan 29, 2016 at 11:50 AM, Jakob Odersky  wrote:

> I'm not an authoritative source but I think it is indeed the plan to
> move the default build to 2.11.
>
> See this discussion for more detail
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/A-proposal-for-Spark-2-0-td15122.html
>
> On Fri, Jan 29, 2016 at 11:43 AM, Deenar Toraskar
>  wrote:
> > A related question. Are the plans to move the default Spark builds to
> Scala
> > 2.11 with Spark 2.0?
> >
> > Regards
> > Deenar
> >
> > On 27 January 2016 at 19:55, Michael Armbrust 
> > wrote:
> >>
> >> We do maintenance releases on demand when there is enough to justify
> doing
> >> one.  I'm hoping to cut 1.6.1 soon, but have not had time yet.
> >>
> >> On Wed, Jan 27, 2016 at 8:12 AM, Daniel Siegmann
> >>  wrote:
> >>>
> >>> Will there continue to be monthly releases on the 1.6.x branch during
> the
> >>> additional time for bug fixes and such?
> >>>
> >>> On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers 
> >>> wrote:
> 
>  thanks thats all i needed
> 
>  On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen 
> wrote:
> >
> > I think it will come significantly later -- or else we'd be at code
> > freeze for 2.x in a few days. I haven't heard anyone discuss this
> > officially but had batted around May or so instead informally in
> > conversation. Does anyone have a particularly strong opinion on that?
> > That's basically an extra 3 month period.
> >
> > https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
> >
> > On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
> > wrote:
> > > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
> > > quarterly release as usual?
> > > Thanks
> 
> 
> >>>
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Getting Exceptions/WARN during random runs for same dataset

2016-01-29 Thread Shixiong(Ryan) Zhu
It's a known issue. See https://issues.apache.org/jira/browse/SPARK-10719

On Thu, Jan 28, 2016 at 5:44 PM, Khusro Siddiqui  wrote:

> It is happening on random executors on random nodes. Not on any specific
> node everytime.
> Or not happening at all
>
> On Thu, Jan 28, 2016 at 7:42 PM, Ted Yu  wrote:
>
>> Did the UnsupportedOperationException's happen from the executors on all the
>> nodes or only one node ?
>>
>> Thanks
>>
>> On Thu, Jan 28, 2016 at 5:13 PM, Khusro Siddiqui 
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> Environment used: Datastax Enterprise 4.8.3 which is bundled with Spark
>>> 1.4.1 and scala 2.10.5.
>>>
>>> I am using Dataframes to query Cassandra, do processing and store the
>>> result back into Cassandra. The job is being submitted using spark-submit
>>> on a cluster of 3 nodes. While doing so I get three WARN messages:
>>>
>>> WARN  2016-01-28 19:08:18 org.apache.spark.scheduler.TaskSetManager:
>>> Lost task 99.0 in stage 2.0 (TID 107, 10.2.1.82):
>>> java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
>>> unable to create instance
>>>
>>> Caused by: java.lang.reflect.InvocationTargetException
>>>
>>> Caused by: java.lang.UnsupportedOperationException: tail of empty list
>>>
>>>
>>> For example, if I am running the same job, for the same input set of
>>> data, say 20 times,
>>>
>>> - 11 times it will run successfully without any WARN messages
>>>
>>> - 4 times it will run successfully with the above messages
>>>
>>> - 6 times it will run successfully by randomly giving one or two of
>>> the exceptions above
>>>
>>>
>>> In all the 20 runs, the output data is coming as expected and there is
>>> no error in that. My concern is, why is it not giving these messages every
>>> time I do a spark-submit but only at times. Also, the stack trace does not
>>> point to any specific point in my line of code. Full stack trace is as
>>> follows. Please let me know if you need any other information
>>>
>>>
>>> WARN  2016-01-28 19:08:24 org.apache.spark.scheduler.TaskSetManager:
>>> Lost task 188.0 in stage 16.0 (TID 637, 10.2.1.82):
>>> java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
>>> unable to create instance
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1788)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>
>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>
>>> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>

Re: GraphX can show graph?

2016-01-29 Thread Russell Jurney
Maybe checkout Gephi. It is a program that does what you need out of the
box.

On Friday, January 29, 2016, Balachandar R.A. 
wrote:

> Thanks... Will look into that
>
> - Bala
>
> On 28 January 2016 at 15:36, Sahil Sareen  > wrote:
>
>> Try Neo4j for visualization, GraphX does a pretty god job at distributed
>> graph processing.
>>
>> On Thu, Jan 28, 2016 at 12:42 PM, Balachandar R.A. <
>> balachandar...@gmail.com
>> > wrote:
>>
>>> Hi
>>>
>>> I am new to GraphX. I have a simple csv file which I could load and
>>> compute few graph statistics. However, I am not sure whether it is possible
>>> to create ad show graph (for visualization purpose) using GraphX. Any
>>> pointer to tutorial or information connected to this will be really helpful
>>>
>>> Thanks and regards
>>> Bala
>>>
>>
>>
>

-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


How to control the number of files for dynamic partition in Spark SQL?

2016-01-29 Thread Benyi Wang
I want to insert into a partition table using dynamic partition, but I
don’t want to have 200 files for a partition because the files will be
small for my case.

sqlContext.sql(  """
|insert overwrite table event
|partition(eventDate)
|select
| user,
| detail,
| eventDate
|from event_wk
  """.stripMargin)

the table “event_wk” is created from a dataframe by registerTempTable,
which is built with some joins. If I set spark.sql.shuffle.partition=2, the
join’s performance will be bad because that property seems global.

I can do something like this:

event_wk.reparitition(2).write.partitionBy("eventDate").format("parquet").save(path)

but I have to handle adding partitions by myself.

Is there a way you can control the number of files just for this last
insert step?
​


Re: Spark 2.0.0 release plan

2016-01-29 Thread Jakob Odersky
I'm not an authoritative source but I think it is indeed the plan to
move the default build to 2.11.

See this discussion for more detail
http://apache-spark-developers-list.1001551.n3.nabble.com/A-proposal-for-Spark-2-0-td15122.html

On Fri, Jan 29, 2016 at 11:43 AM, Deenar Toraskar
 wrote:
> A related question. Are the plans to move the default Spark builds to Scala
> 2.11 with Spark 2.0?
>
> Regards
> Deenar
>
> On 27 January 2016 at 19:55, Michael Armbrust 
> wrote:
>>
>> We do maintenance releases on demand when there is enough to justify doing
>> one.  I'm hoping to cut 1.6.1 soon, but have not had time yet.
>>
>> On Wed, Jan 27, 2016 at 8:12 AM, Daniel Siegmann
>>  wrote:
>>>
>>> Will there continue to be monthly releases on the 1.6.x branch during the
>>> additional time for bug fixes and such?
>>>
>>> On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers 
>>> wrote:

 thanks thats all i needed

 On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen  wrote:
>
> I think it will come significantly later -- or else we'd be at code
> freeze for 2.x in a few days. I haven't heard anyone discuss this
> officially but had batted around May or so instead informally in
> conversation. Does anyone have a particularly strong opinion on that?
> That's basically an extra 3 month period.
>
> https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
>
> On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
> wrote:
> > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
> > quarterly release as usual?
> > Thanks


>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 2.0.0 release plan

2016-01-29 Thread Deenar Toraskar
A related question. Are the plans to move the default Spark builds to Scala
2.11 with Spark 2.0?

Regards
Deenar

On 27 January 2016 at 19:55, Michael Armbrust 
wrote:

> We do maintenance releases on demand when there is enough to justify doing
> one.  I'm hoping to cut 1.6.1 soon, but have not had time yet.
>
> On Wed, Jan 27, 2016 at 8:12 AM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> Will there continue to be monthly releases on the 1.6.x branch during the
>> additional time for bug fixes and such?
>>
>> On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers 
>> wrote:
>>
>>> thanks thats all i needed
>>>
>>> On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen  wrote:
>>>
 I think it will come significantly later -- or else we'd be at code
 freeze for 2.x in a few days. I haven't heard anyone discuss this
 officially but had batted around May or so instead informally in
 conversation. Does anyone have a particularly strong opinion on that?
 That's basically an extra 3 month period.

 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage

 On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
 wrote:
 > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
 > quarterly release as usual?
 > Thanks

>>>
>>>
>>
>


Re: Spark 2.0.0 release plan

2016-01-29 Thread Michael Armbrust
Its already underway: https://github.com/apache/spark/pull/10608

On Fri, Jan 29, 2016 at 11:50 AM, Jakob Odersky  wrote:

> I'm not an authoritative source but I think it is indeed the plan to
> move the default build to 2.11.
>
> See this discussion for more detail
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/A-proposal-for-Spark-2-0-td15122.html
>
> On Fri, Jan 29, 2016 at 11:43 AM, Deenar Toraskar
>  wrote:
> > A related question. Are the plans to move the default Spark builds to
> Scala
> > 2.11 with Spark 2.0?
> >
> > Regards
> > Deenar
> >
> > On 27 January 2016 at 19:55, Michael Armbrust 
> > wrote:
> >>
> >> We do maintenance releases on demand when there is enough to justify
> doing
> >> one.  I'm hoping to cut 1.6.1 soon, but have not had time yet.
> >>
> >> On Wed, Jan 27, 2016 at 8:12 AM, Daniel Siegmann
> >>  wrote:
> >>>
> >>> Will there continue to be monthly releases on the 1.6.x branch during
> the
> >>> additional time for bug fixes and such?
> >>>
> >>> On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers 
> >>> wrote:
> 
>  thanks thats all i needed
> 
>  On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen 
> wrote:
> >
> > I think it will come significantly later -- or else we'd be at code
> > freeze for 2.x in a few days. I haven't heard anyone discuss this
> > officially but had batted around May or so instead informally in
> > conversation. Does anyone have a particularly strong opinion on that?
> > That's basically an extra 3 month period.
> >
> > https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
> >
> > On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
> > wrote:
> > > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
> > > quarterly release as usual?
> > > Thanks
> 
> 
> >>>
> >>
> >
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
Of cause. If you use a ThreadLocal in a long living thread and forget to
remove it, it's definitely a memory leak.

On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:

> Hello,
>
> Does anyone know if there are any potential pitfalls associated with using
> ThreadLocal variables in a Spark streaming application? One things I have
> seen mentioned in the context of app servers that use thread pools is that
> ThreadLocals can leak memory. Could this happen in Spark streaming also?
>
> Thanks
> Nikunj
>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Thanks for the response Ryan. So I would say that it is in fact the purpose
of a ThreadLocal i.e. to have a copy of the variable as long as the thread
lives. I guess my concern is around usage of threadpools and whether Spark
streaming will internally create many threads that rotate between tasks on
purpose thereby holding onto ThreadLocals that may actually never be used
again.

Thanks

On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Of cause. If you use a ThreadLocal in a long living thread and forget to
> remove it, it's definitely a memory leak.
>
> On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:
>
>> Hello,
>>
>> Does anyone know if there are any potential pitfalls associated with
>> using ThreadLocal variables in a Spark streaming application? One things I
>> have seen mentioned in the context of app servers that use thread pools is
>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>> also?
>>
>> Thanks
>> Nikunj
>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
Spark Streaming uses threadpools so you need to remove ThreadLocal when
it's not used.

On Fri, Jan 29, 2016 at 12:55 PM, N B  wrote:

> Thanks for the response Ryan. So I would say that it is in fact the
> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
> thread lives. I guess my concern is around usage of threadpools and whether
> Spark streaming will internally create many threads that rotate between
> tasks on purpose thereby holding onto ThreadLocals that may actually never
> be used again.
>
> Thanks
>
> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Of cause. If you use a ThreadLocal in a long living thread and forget to
>> remove it, it's definitely a memory leak.
>>
>> On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:
>>
>>> Hello,
>>>
>>> Does anyone know if there are any potential pitfalls associated with
>>> using ThreadLocal variables in a Spark streaming application? One things I
>>> have seen mentioned in the context of app servers that use thread pools is
>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>> also?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>
>