pyspark.daemon exhaust a lot of memory

2018-04-09 Thread Niu Zhaojie
Hi All,

We are running spark 2.1.1 on Hadoop YARN 2.6.5.

We found the pyspark.daemon process consume more than 300GB memory.

However, according to
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals, the
daemon process shouldn't have this problem.

Also, we find the daemon process is forked by the container process,
obviously it already beyonds the container memory limit, why YARN doesn't
kill this container?

-- 
*Regards,*
*Zhaojie*


Correlated subqueries in the DataFrame API

2018-04-09 Thread Nicholas Chammas
I just submitted SPARK-23945
 but wanted to double
check here to make sure I didn't miss something fundamental.

Correlated subqueries are tracked at a high level in SPARK-18455
, but it's not clear to
me whether they are "design-appropriate" for the DataFrame API.

Are correlated subqueries a thing we can expect to have in the DataFrame
API?

Nick


Re: Fair scheduler pool leak

2018-04-09 Thread Imran Rashid
If I understand what you're trying to do correctly, I think you really just
want one pool, but you want to change the mode *within* the pool to be FAIR
as well

https://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties

you'd still need to change the conf file to set up that pool, but that
should be fairly straight-forward?  Another approach to what you're asking
might be to expose the scheduler configuration as command line confs as
well, which seems reasonable and simple.

On Sat, Apr 7, 2018 at 5:55 PM, Matthias Boehm  wrote:

> well, the point was "in a programmatic way without the need for
> additional configuration files which is a hassle for a library" -
> anyway, I appreciate your comments.
>
> Regards,
> Matthias
>
> On Sat, Apr 7, 2018 at 3:43 PM, Mark Hamstra 
> wrote:
> >> Providing a way to set the mode of the default scheduler would be
> awesome.
> >
> >
> > That's trivial: Just use the pool configuration XML file and define a
> pool
> > named "default" with the characteristics that you want (including
> > schedulingMode FAIR).
> >
> > You only get the default construction of the pool named "default" is you
> > don't define your own "default".
> >
> > On Sat, Apr 7, 2018 at 2:32 PM, Matthias Boehm 
> wrote:
> >>
> >> No, these pools are not created per job but per parfor worker and
> >> thus, used to execute many jobs. For all scripts with a single
> >> top-level parfor this is equivalent to static initialization. However,
> >> yes we create these pools dynamically on demand to avoid unnecessary
> >> initialization and handle scenarios of nested parfor.
> >>
> >> At the end of the day, we just want to configure fair scheduling in a
> >> programmatic way without the need for additional configuration files
> >> which is a hassle for a library that is meant to work out-of-the-box.
> >> Simply setting 'spark.scheduler.mode' to FAIR does not do the trick
> >> because we end up with a single default fair scheduler pool in FIFO
> >> mode, which is equivalent to FIFO. Providing a way to set the mode of
> >> the default scheduler would be awesome.
> >>
> >> Regarding why fair scheduling showed generally better performance for
> >> out-of-core datasets, I don't have a good answer. My guess was
> >> isolated job scheduling and better locality of in-memory partitions.
> >>
> >> Regards,
> >> Matthias
> >>
> >> On Sat, Apr 7, 2018 at 8:50 AM, Mark Hamstra 
> >> wrote:
> >> > Sorry, but I'm still not understanding this use case. Are you somehow
> >> > creating additional scheduling pools dynamically as Jobs execute? If
> so,
> >> > that is a very unusual thing to do. Scheduling pools are intended to
> be
> >> > statically configured -- initialized, living and dying with the
> >> > Application.
> >> >
> >> > On Sat, Apr 7, 2018 at 12:33 AM, Matthias Boehm 
> >> > wrote:
> >> >>
> >> >> Thanks for the clarification Imran - that helped. I was mistakenly
> >> >> assuming that these pools are removed via weak references, as the
> >> >> ContextCleaner does for RDDs, broadcasts, and accumulators, etc. For
> >> >> the time being, we'll just work around it, but I'll file a
> >> >> nice-to-have improvement JIRA. Also, you're right, we see indeed
> these
> >> >> warnings but they're usually hidden when running with ERROR or INFO
> >> >> (due to overwhelming output) log levels.
> >> >>
> >> >> Just to give the context: We use these scheduler pools in SystemML's
> >> >> parallel for loop construct (parfor), which allows combining data-
> and
> >> >> task-parallel computation. If the data fits into the remote memory
> >> >> budget, the optimizer may decide to execute the entire loop as a
> >> >> single spark job (with groups of iterations mapped to spark tasks).
> If
> >> >> the data is too large and non-partitionable, the parfor loop is
> >> >> executed as a multi-threaded operator in the driver and each worker
> >> >> might spawn several data-parallel spark jobs in the context of the
> >> >> worker's scheduler pool, for operations that don't fit into the
> >> >> driver.
> >> >>
> >> >> We decided to use these fair scheduler pools (w/ fair scheduling
> >> >> across pools, FIFO per pool) instead of the default FIFO scheduler
> >> >> because it gave us better and more robust performance back in the
> >> >> Spark 1.x line. This was especially true for concurrent jobs over
> >> >> shared input data (e.g., for hyper parameter tuning) and when the
> data
> >> >> size exceeded aggregate memory. The only downside was that we had to
> >> >> guard against scenarios where concurrently jobs would lazily pull a
> >> >> shared RDD into cache because that lead to thread contention on the
> >> >> executors' block managers and spurious replicated in-memory
> >> >> partitions.
> >> >>
> >> >> Regards,
> >> >> Matthias
> >> >>
> >> >> On Fri, Apr 6, 2018 at 8:08 AM, Imran Rashid 
> 

Re: Clarify window behavior in Spark SQL

2018-04-09 Thread Sandor Murakozi
Hi Li,
You might find my pending PR useful:
https://github.com/apache/spark/pull/20045/files

It contains a big bunch of test cases covering the windowing functionality,
showing and checking the behavior of a number of special cases.

On Wed, Apr 4, 2018 at 4:26 AM, Reynold Xin  wrote:

> Thanks Li!
>
> On Tue, Apr 3, 2018 at 7:23 PM Li Jin  wrote:
>
>> Thanks all for the explanation. I am happy to update the API doc.
>>
>> https://issues.apache.org/jira/browse/SPARK-23861
>>
>> On Tue, Apr 3, 2018 at 8:54 PM, Reynold Xin  wrote:
>>
>>> Ah ok. Thanks for commenting. Everyday I learn something new about SQL.
>>>
>>> For others to follow, SQL Server has a good explanation of the behavior:
>>> https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-
>>> transact-sql
>>>
>>>
>>> Can somebody (Li?) update the API documentation to specify the gotchas,
>>> in case users are not familiar with SQL window function semantics?
>>>
>>>
>>>
>>> General Remarks
>>> 
>>>
>>> More than one window function can be used in a single query with a
>>> single FROM clause. The OVER clause for each function can differ in
>>> partitioning and ordering.
>>>
>>> If PARTITION BY is not specified, the function treats all rows of the
>>> query result set as a single group.
>>> Important!
>>> 
>>>
>>> If ROWS/RANGE is specified and  is used for
>>>  (short syntax) then this specification is used for
>>> the window frame boundary starting point and CURRENT ROW is used for the
>>> boundary ending point. For example “ROWS 5 PRECEDING” is equal to “ROWS
>>> BETWEEN 5 PRECEDING AND CURRENT ROW”.
>>>
>>> Note+
>>>
>>> If ORDER BY is not specified entire partition is used for a window
>>> frame. This applies only to functions that do not require ORDER BY clause.
>>> If ROWS/RANGE is not specified but ORDER BY is specified, RANGE UNBOUNDED
>>> PRECEDING AND CURRENT ROW is used as default for window frame. This applies
>>> only to functions that have can accept optional ROWS/RANGE specification.
>>> For example, ranking functions cannot accept ROWS/RANGE, therefore this
>>> window frame is not applied even though ORDER BY is present and ROWS/RANGE
>>> is not.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Apr 3, 2018 at 5:50 PM, Xingbo Jiang 
>>> wrote:
>>>
 This is actually by design, without a `ORDER BY` clause, all rows are
 considered as the peer row of the current row, which means that the frame
 is effectively the entire partition. This behavior follows the window
 syntax of PGSQL.
 You can refer to the comment by yhuai: https://github.com/
 apache/spark/pull/5604#discussion_r157931911
 :)

 2018-04-04 6:27 GMT+08:00 Reynold Xin :

> Do other (non-Hive) SQL systems do the same thing?
>
> On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
> her...@databricks.com> wrote:
>
>> This is something we inherited from Hive: https://cwiki.apache.
>> org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics
>>
>> When ORDER BY is specified with missing WINDOW clause, the WINDOW
>>> specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND
>>> CURRENT ROW.
>>
>> When both ORDER BY and WINDOW clauses are missing, the WINDOW
>>> specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND
>>> UNBOUNDED FOLLOWING.
>>
>>
>> It sort of makes sense if you think about it. If there is no ordering
>> there is no way to have a bound frame. If there is ordering we default to
>> the most commonly used deterministic frame.
>>
>>
>> On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin 
>> wrote:
>>
>>> Seems like a bug.
>>>
>>>
>>>
>>> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin 
>>> wrote:
>>>
 Hi Devs,

 I am seeing some behavior with window functions that is a bit
 unintuitive and would like to get some clarification.

 When using aggregation function with window, the frame boundary
 seems to change depending on the order of the window.

 Example:
 (1)

 df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

 w1 = Window.partitionBy('id')

 df.withColumn('v2', mean(df.v).over(w1)).show()

 +---+---+---+

 | id|  v| v2|

 +---+---+---+

 |  0|  1|2.0|

 |  0|  2|2.0|

 |  0|  3|2.0|

 +---+---+---+

 (2)
 df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id',