Re: Alternatives for dataframe collectAsList()

2017-04-04 Thread lucas.g...@gmail.com
As Keith said, it depends on what you want to do with your data.

>From a pipelining perspective the general flow (YMMV) is:

Load dataset(s) -> Transform and / or Join --> Aggregate --> Write dataset

Each step in the pipeline does something distinct with the data.

The end step is usually loading the final data into something that can
display / query it (IE a DBMS of some sort)

That's where you'd start doing your queries etc.

There's generally no 'good' IMO reason to be collecting your data on the
driver except for testing / validation / exploratory work.

I hope that helps.

Gary Lucas

On 4 April 2017 at 12:12, Keith Chapman  wrote:

> As Paul said it really depends on what you want to do with your data,
> perhaps writing it to a file would be a better option, but again it depends
> on what you want to do with the data you collect.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Tue, Apr 4, 2017 at 7:38 AM, Eike von Seggern <
> eike.segg...@sevenval.com> wrote:
>
>> Hi,
>>
>> depending on what you're trying to achieve `RDD.toLocalIterator()` might
>> help you.
>>
>> Best
>>
>> Eike
>>
>>
>> 2017-03-29 21:00 GMT+02:00 szep.laszlo.it :
>>
>>> Hi,
>>>
>>> after I created a dataset
>>>
>>> Dataset df = sqlContext.sql("query");
>>>
>>> I need to have a result values and I call a method: collectAsList()
>>>
>>> List list = df.collectAsList();
>>>
>>> But it's very slow, if I work with large datasets (20-30 million
>>> records). I
>>> know, that the result isn't presented in driver app, that's why it takes
>>> long time, because collectAsList() collect all data from worker nodes.
>>>
>>> But then what is the right way to get result values? Is there an other
>>> solution to iterate over a result dataset rows, or get values? Can anyone
>>> post a small & working example?
>>>
>>> Thanks & Regards,
>>> Laszlo Szep
>>>
>>
>


Re: how do i force unit test to do whole stage codegen

2017-04-04 Thread Koert Kuipers
got it. thats good to know. thanks!

On Wed, Apr 5, 2017 at 12:07 AM, Kazuaki Ishizaki 
wrote:

> Hi,
> The page in the URL explains the old style of physical plan output.
> The current style adds "*" as a prefix of each operation that the
> whole-stage codegen can be apply to.
>
> So, in your test case, whole-stage codegen has been already enabled!!
>
> FYI. I think that it is a good topic for d...@spark.apache.org.
>
> Kazuaki Ishizaki
>
>
>
> From:Koert Kuipers 
> To:"user@spark.apache.org" 
> Date:2017/04/05 05:12
> Subject:how do i force unit test to do whole stage codegen
> --
>
>
>
> i wrote my own expression with eval and doGenCode, but doGenCode never
> gets called in tests.
>
> also as a test i ran this in a unit test:
> spark.range(10).select('id as 'asId).where('id === 4).explain
> according to
>
> *https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html*
> 
> this is supposed to show:
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#0L AS asId#3L]
> : +- Filter (id#0L = 4)
> :+- Range 0, 1, 8, 10, [id#0L]
>
> but it doesn't. instead it shows:
>
> == Physical Plan ==
> *Project [id#12L AS asId#15L]
> +- *Filter (id#12L = 4)
>   +- *Range (0, 10, step=1, splits=Some(4))
>
> so i am again missing the WholeStageCodegen. any idea why?
>
> i create spark session for unit tests simply as:
> val session = SparkSession.builder
>  .master("local[*]")
>  .appName("test")
>  .config("spark.sql.shuffle.partitions", 4)
>  .getOrCreate()
>
>
>


spark stages UI page has 'gc time' column Emtpy

2017-04-04 Thread satishl
Hi, I am using spark 1.6 in YARN cluster mode. When my application runs, I am
unable to see gc time metrics in the Spark UI (Application
UI->Stages->Tasks). I am attaching the screenshot here.
Is this a bug in Spark UI or is this expected?

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-stages-UI-page-has-gc-time-column-Emtpy-tp28568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how do i force unit test to do whole stage codegen

2017-04-04 Thread Kazuaki Ishizaki
Hi,
The page in the URL explains the old style of physical plan output.
The current style adds "*" as a prefix of each operation that the 
whole-stage codegen can be apply to.

So, in your test case, whole-stage codegen has been already enabled!!

FYI. I think that it is a good topic for d...@spark.apache.org.

Kazuaki Ishizaki



From:   Koert Kuipers 
To: "user@spark.apache.org" 
Date:   2017/04/05 05:12
Subject:how do i force unit test to do whole stage codegen



i wrote my own expression with eval and doGenCode, but doGenCode never 
gets called in tests.

also as a test i ran this in a unit test:
spark.range(10).select('id as 'asId).where('id === 4).explain
according to
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html
this is supposed to show:
== Physical Plan ==
WholeStageCodegen
:  +- Project [id#0L AS asId#3L]
: +- Filter (id#0L = 4)
:+- Range 0, 1, 8, 10, [id#0L]

but it doesn't. instead it shows:

== Physical Plan ==
*Project [id#12L AS asId#15L]
+- *Filter (id#12L = 4)
   +- *Range (0, 10, step=1, splits=Some(4))

so i am again missing the WholeStageCodegen. any idea why?

i create spark session for unit tests simply as:
val session = SparkSession.builder
  .master("local[*]")
  .appName("test")
  .config("spark.sql.shuffle.partitions", 4)
  .getOrCreate()





With Twitter4j API, why am I not able to pull tweets with certain keywords?

2017-04-04 Thread Gaurav1809
I am using Spark Streaming to with twitter4j API to pull tweets.

I am able to pull tweets for some keywords but not for others. If I
explicitly tweet with those keywords, even then API does not pull them. For
some it is smooth. Has anyone encountered this issue before? Please suggest
solution. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/With-Twitter4j-API-why-am-I-not-able-to-pull-tweets-with-certain-keywords-tp28567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why do we ever run out of memory in Spark Structured Streaming?

2017-04-04 Thread Tathagata Das
Are you referring to the memory usage of stateful operations like
aggregations, or the new mapGroupsWithState?
The current implementation of the internal state store (that maintains the
stateful aggregates) is such that it keeps all the data in memory of the
executor. It does use HDFS-compatible file system for checkpointing, but as
of now, it currently keeps all the data in memory of the executor. This is
something we will improve in the future.

That said, you can enabled watermarking in your query that would
automatically clear old, unnecessary state thus limiting the total memory
used for stateful operations.
Furthermore, you can also monitor the size of the state and get alerts if
the state is growing too large.

Read more in the programming guide.
Watermarking -
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking
Monitoring -
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

In case you were referring to something else, please give us more context
details - what query, what are the symptoms you are observing.

On Tue, Apr 4, 2017 at 5:17 PM, kant kodali  wrote:

> Why do we ever run out of memory in Spark Structured Streaming especially
> when Memory can always spill to disk ? until the disk is full we shouldn't
> be out of memory.isn't it? sure thrashing will happen more frequently and
> degrades performance but we do we ever run out Memory even in case of
> maintaining a state for 6 months or a year?
>
> Thanks!
>


Why do we ever run out of memory in Spark Structured Streaming?

2017-04-04 Thread kant kodali
Why do we ever run out of memory in Spark Structured Streaming especially
when Memory can always spill to disk ? until the disk is full we shouldn't
be out of memory.isn't it? sure thrashing will happen more frequently and
degrades performance but we do we ever run out Memory even in case of
maintaining a state for 6 months or a year?

Thanks!


Re: bug with PYTHONHASHSEED

2017-04-04 Thread Jeff Zhang
It is fixed in https://issues.apache.org/jira/browse/SPARK-13330



Holden Karau 于2017年4月5日周三 上午12:03写道:

> Which version of Spark is this (or is it a dev build)? We've recently made
> some improvements with PYTHONHASHSEED propagation.
>
> On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern  cal.com> wrote:
>
> 2017-04-01 21:54 GMT+02:00 Paul Tremblay :
>
> When I try to to do a groupByKey() in my spark environment, I get the
> error described here:
>
>
> http://stackoverflow.com/questions/36798833/what-does-exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>
> In order to attempt to fix the problem, I set up my ipython environment
> with the additional line:
>
> PYTHONHASHSEED=1
>
> When I fire up my ipython shell, and do:
>
> In [7]: hash("foo")
> Out[7]: -2457967226571033580
>
> In [8]: hash("foo")
> Out[8]: -2457967226571033580
>
> So my hash function is now seeded so it returns consistent values. But
> when I do a groupByKey(), I get the same error:
>
>
> Exception: Randomness of hash of string should be disabled via
> PYTHONHASHSEED
>
> Anyone know how to fix this problem in python 3.4?
>
>
> Independent of the python version, you have to ensure that Python on
> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
> adding it to the environment of the spark processes.
>
> Best
>
> Eike
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


how do i force unit test to do whole stage codegen

2017-04-04 Thread Koert Kuipers
i wrote my own expression with eval and doGenCode, but doGenCode never gets
called in tests.

also as a test i ran this in a unit test:
spark.range(10).select('id as 'asId).where('id === 4).explain
according to
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html
this is supposed to show:

== Physical Plan ==WholeStageCodegen
:  +- Project [id#0L AS asId#3L]
: +- Filter (id#0L = 4)
:+- Range 0, 1, 8, 10, [id#0L]

but it doesn't. instead it shows:

== Physical Plan ==
*Project [id#12L AS asId#15L]
+- *Filter (id#12L = 4)
   +- *Range (0, 10, step=1, splits=Some(4))

so i am again missing the WholeStageCodegen. any idea why?

i create spark session for unit tests simply as:
val session = SparkSession.builder
  .master("local[*]")
  .appName("test")
  .config("spark.sql.shuffle.partitions", 4)
  .getOrCreate()


Re: Alternatives for dataframe collectAsList()

2017-04-04 Thread Keith Chapman
As Paul said it really depends on what you want to do with your data,
perhaps writing it to a file would be a better option, but again it depends
on what you want to do with the data you collect.

Regards,
Keith.

http://keith-chapman.com

On Tue, Apr 4, 2017 at 7:38 AM, Eike von Seggern 
wrote:

> Hi,
>
> depending on what you're trying to achieve `RDD.toLocalIterator()` might
> help you.
>
> Best
>
> Eike
>
>
> 2017-03-29 21:00 GMT+02:00 szep.laszlo.it :
>
>> Hi,
>>
>> after I created a dataset
>>
>> Dataset df = sqlContext.sql("query");
>>
>> I need to have a result values and I call a method: collectAsList()
>>
>> List list = df.collectAsList();
>>
>> But it's very slow, if I work with large datasets (20-30 million
>> records). I
>> know, that the result isn't presented in driver app, that's why it takes
>> long time, because collectAsList() collect all data from worker nodes.
>>
>> But then what is the right way to get result values? Is there an other
>> solution to iterate over a result dataset rows, or get values? Can anyone
>> post a small & working example?
>>
>> Thanks & Regards,
>> Laszlo Szep
>>
>


Re: map transform on array in spark sql

2017-04-04 Thread Michael Armbrust
If you can find the name of the struct field from the schema you can just
do:

df.select($"arrayField.a")

Selecting a field from an array returns an array with that field selected
from each element.

On Mon, Apr 3, 2017 at 8:18 PM, Koert Kuipers  wrote:

> i have a DataFrame where one column has type:
>
> ArrayType(StructType(Seq(
>   StructField("a", typeA, nullableA),
>   StructField("b", typeB, nullableB)
> )))
>
> i would like to map over this array to pick the first element in the
> struct. so the result should be a ArrayType(typeA, nullableA). i realize i
> can do this with a scala udf if i know typeA. but what if i dont know typeA?
>
> basically i would like to do an expression like:
> map(col("x"), _(0)))
>
> any suggestions?
>
>


Re: bug with PYTHONHASHSEED

2017-04-04 Thread Holden Karau
Which version of Spark is this (or is it a dev build)? We've recently made
some improvements with PYTHONHASHSEED propagation.

On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern 
wrote:

2017-04-01 21:54 GMT+02:00 Paul Tremblay :

When I try to to do a groupByKey() in my spark environment, I get the error
described here:

http://stackoverflow.com/questions/36798833/what-does-exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh

In order to attempt to fix the problem, I set up my ipython environment
with the additional line:

PYTHONHASHSEED=1

When I fire up my ipython shell, and do:

In [7]: hash("foo")
Out[7]: -2457967226571033580

In [8]: hash("foo")
Out[8]: -2457967226571033580

So my hash function is now seeded so it returns consistent values. But when
I do a groupByKey(), I get the same error:


Exception: Randomness of hash of string should be disabled via
PYTHONHASHSEED

Anyone know how to fix this problem in python 3.4?


Independent of the python version, you have to ensure that Python on
spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
adding it to the environment of the spark processes.

Best

Eike

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: bug with PYTHONHASHSEED

2017-04-04 Thread Paul Tremblay
So that means I have to pass that bash variable to the EMR clusters when I
spin them up, not afterwards. I'll give that a go.

Thanks!

Henry

On Tue, Apr 4, 2017 at 7:49 AM, Eike von Seggern 
wrote:

> 2017-04-01 21:54 GMT+02:00 Paul Tremblay :
>
>> When I try to to do a groupByKey() in my spark environment, I get the
>> error described here:
>>
>> http://stackoverflow.com/questions/36798833/what-does-except
>> ion-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>>
>> In order to attempt to fix the problem, I set up my ipython environment
>> with the additional line:
>>
>> PYTHONHASHSEED=1
>>
>> When I fire up my ipython shell, and do:
>>
>> In [7]: hash("foo")
>> Out[7]: -2457967226571033580
>>
>> In [8]: hash("foo")
>> Out[8]: -2457967226571033580
>>
>> So my hash function is now seeded so it returns consistent values. But
>> when I do a groupByKey(), I get the same error:
>>
>>
>> Exception: Randomness of hash of string should be disabled via
>> PYTHONHASHSEED
>>
>> Anyone know how to fix this problem in python 3.4?
>>
>
> Independent of the python version, you have to ensure that Python on
> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
> adding it to the environment of the spark processes.
>
> Best
>
> Eike
>



-- 
Paul Henry Tremblay
Robert Half Technology


Re: bug with PYTHONHASHSEED

2017-04-04 Thread Eike von Seggern
2017-04-01 21:54 GMT+02:00 Paul Tremblay :

> When I try to to do a groupByKey() in my spark environment, I get the
> error described here:
>
> http://stackoverflow.com/questions/36798833/what-does-except
> ion-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>
> In order to attempt to fix the problem, I set up my ipython environment
> with the additional line:
>
> PYTHONHASHSEED=1
>
> When I fire up my ipython shell, and do:
>
> In [7]: hash("foo")
> Out[7]: -2457967226571033580
>
> In [8]: hash("foo")
> Out[8]: -2457967226571033580
>
> So my hash function is now seeded so it returns consistent values. But
> when I do a groupByKey(), I get the same error:
>
>
> Exception: Randomness of hash of string should be disabled via
> PYTHONHASHSEED
>
> Anyone know how to fix this problem in python 3.4?
>

Independent of the python version, you have to ensure that Python on
spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
adding it to the environment of the spark processes.

Best

Eike


Re: Alternatives for dataframe collectAsList()

2017-04-04 Thread Eike von Seggern
Hi,

depending on what you're trying to achieve `RDD.toLocalIterator()` might
help you.

Best

Eike


2017-03-29 21:00 GMT+02:00 szep.laszlo.it :

> Hi,
>
> after I created a dataset
>
> Dataset df = sqlContext.sql("query");
>
> I need to have a result values and I call a method: collectAsList()
>
> List list = df.collectAsList();
>
> But it's very slow, if I work with large datasets (20-30 million records).
> I
> know, that the result isn't presented in driver app, that's why it takes
> long time, because collectAsList() collect all data from worker nodes.
>
> But then what is the right way to get result values? Is there an other
> solution to iterate over a result dataset rows, or get values? Can anyone
> post a small & working example?
>
> Thanks & Regards,
> Laszlo Szep
>


Re: Executor unable to pick postgres driver in Spark standalone cluster

2017-04-04 Thread Sam Elamin
Hi Rishikesh,

Sounds like the postgres driver isnt being loaded on the path. To try and
debug it try submit the application with the --jars

e.g.

spark-submit {application.jar} --jars /home/ubuntu/downloads/
postgres/postgresql-9.4-1200-jdbc41.jar


If that does not work then there is a problem in the application itself and
the reason it is working is because you have the dependency in your class
path locally


Regards
Sam

On Mon, Apr 3, 2017 at 2:43 PM, Rishikesh Teke 
wrote:

>
> Hi all,
>
> I was submitting the play application to spark 2.1 standalone cluster . In
> play application postgres dependency is also added and application works on
> local spark libraries. But at run time on standalone cluster it gives me
> error :
>
> o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 1, 172.31.21.3,
> executor 1): java.lang.ClassNotFoundException: org.postgresql
> .Driver
>
> I have placed following in spark-defaults.conf directory
>
> spark.executor.extraClassPath
> /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar
> spark.driver.extraClassPath
> /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar
>
> Still executors unable to pick the driver.
> Am i missing something? Need help .
> Thanks.
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Executor-unable-to-pick-postgres-driver-in-Spark-
> standalone-cluster-tp28563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>