Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Reynold Xin
Thanks Li!

On Tue, Apr 3, 2018 at 7:23 PM Li Jin  wrote:

> Thanks all for the explanation. I am happy to update the API doc.
>
> https://issues.apache.org/jira/browse/SPARK-23861
>
> On Tue, Apr 3, 2018 at 8:54 PM, Reynold Xin  wrote:
>
>> Ah ok. Thanks for commenting. Everyday I learn something new about SQL.
>>
>> For others to follow, SQL Server has a good explanation of the behavior:
>> https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql
>>
>>
>> Can somebody (Li?) update the API documentation to specify the gotchas,
>> in case users are not familiar with SQL window function semantics?
>>
>>
>>
>> General Remarks
>> 
>>
>> More than one window function can be used in a single query with a single
>> FROM clause. The OVER clause for each function can differ in partitioning
>> and ordering.
>>
>> If PARTITION BY is not specified, the function treats all rows of the
>> query result set as a single group.
>> Important!
>> 
>>
>> If ROWS/RANGE is specified and  is used for
>>  (short syntax) then this specification is used for
>> the window frame boundary starting point and CURRENT ROW is used for the
>> boundary ending point. For example “ROWS 5 PRECEDING” is equal to “ROWS
>> BETWEEN 5 PRECEDING AND CURRENT ROW”.
>>
>> Note+
>>
>> If ORDER BY is not specified entire partition is used for a window frame.
>> This applies only to functions that do not require ORDER BY clause. If
>> ROWS/RANGE is not specified but ORDER BY is specified, RANGE UNBOUNDED
>> PRECEDING AND CURRENT ROW is used as default for window frame. This applies
>> only to functions that have can accept optional ROWS/RANGE specification.
>> For example, ranking functions cannot accept ROWS/RANGE, therefore this
>> window frame is not applied even though ORDER BY is present and ROWS/RANGE
>> is not.
>>
>>
>>
>>
>>
>> On Tue, Apr 3, 2018 at 5:50 PM, Xingbo Jiang 
>> wrote:
>>
>>> This is actually by design, without a `ORDER BY` clause, all rows are
>>> considered as the peer row of the current row, which means that the frame
>>> is effectively the entire partition. This behavior follows the window
>>> syntax of PGSQL.
>>> You can refer to the comment by yhuai:
>>> https://github.com/apache/spark/pull/5604#discussion_r157931911
>>> :)
>>>
>>> 2018-04-04 6:27 GMT+08:00 Reynold Xin :
>>>
 Do other (non-Hive) SQL systems do the same thing?

 On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
 her...@databricks.com> wrote:

> This is something we inherited from Hive:
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics
>
> When ORDER BY is specified with missing WINDOW clause, the WINDOW
>> specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND
>> CURRENT ROW.
>
> When both ORDER BY and WINDOW clauses are missing, the WINDOW
>> specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND
>> UNBOUNDED FOLLOWING.
>
>
> It sort of makes sense if you think about it. If there is no ordering
> there is no way to have a bound frame. If there is ordering we default to
> the most commonly used deterministic frame.
>
>
> On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin 
> wrote:
>
>> Seems like a bug.
>>
>>
>>
>> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:
>>
>>> Hi Devs,
>>>
>>> I am seeing some behavior with window functions that is a bit
>>> unintuitive and would like to get some clarification.
>>>
>>> When using aggregation function with window, the frame boundary
>>> seems to change depending on the order of the window.
>>>
>>> Example:
>>> (1)
>>>
>>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>>
>>> w1 = Window.partitionBy('id')
>>>
>>> df.withColumn('v2', mean(df.v).over(w1)).show()
>>>
>>> +---+---+---+
>>>
>>> | id|  v| v2|
>>>
>>> +---+---+---+
>>>
>>> |  0|  1|2.0|
>>>
>>> |  0|  2|2.0|
>>>
>>> |  0|  3|2.0|
>>>
>>> +---+---+---+
>>>
>>> (2)
>>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>>
>>> w2 = Window.partitionBy('id').orderBy('v')
>>>
>>> df.withColumn('v2', mean(df.v).over(w2)).show()
>>>
>>> +---+---+---+
>>>
>>> | id|  v| v2|
>>>
>>> +---+---+---+
>>>
>>> |  0|  1|1.0|
>>>
>>> |  0|  2|1.5|
>>>
>>> |  0|  3|2.0|
>>>
>>> +---+---+---+
>>>
>>> Seems like orderBy('v') in the example (2) also changes the frame
>>> boundaries from (
>>>
>>> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
>>> currentRow).
>>>
>>>
>>> I found this behavi

Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Li Jin
Thanks all for the explanation. I am happy to update the API doc.

https://issues.apache.org/jira/browse/SPARK-23861

On Tue, Apr 3, 2018 at 8:54 PM, Reynold Xin  wrote:

> Ah ok. Thanks for commenting. Everyday I learn something new about SQL.
>
> For others to follow, SQL Server has a good explanation of the behavior:
> https://docs.microsoft.com/en-us/sql/t-sql/queries
> /select-over-clause-transact-sql
>
>
> Can somebody (Li?) update the API documentation to specify the gotchas, in
> case users are not familiar with SQL window function semantics?
>
>
>
> General Remarks
> 
>
> More than one window function can be used in a single query with a single
> FROM clause. The OVER clause for each function can differ in partitioning
> and ordering.
>
> If PARTITION BY is not specified, the function treats all rows of the
> query result set as a single group.
> Important!
> 
>
> If ROWS/RANGE is specified and  is used for
>  (short syntax) then this specification is used for
> the window frame boundary starting point and CURRENT ROW is used for the
> boundary ending point. For example “ROWS 5 PRECEDING” is equal to “ROWS
> BETWEEN 5 PRECEDING AND CURRENT ROW”.
>
> Note+
>
> If ORDER BY is not specified entire partition is used for a window frame.
> This applies only to functions that do not require ORDER BY clause. If
> ROWS/RANGE is not specified but ORDER BY is specified, RANGE UNBOUNDED
> PRECEDING AND CURRENT ROW is used as default for window frame. This applies
> only to functions that have can accept optional ROWS/RANGE specification.
> For example, ranking functions cannot accept ROWS/RANGE, therefore this
> window frame is not applied even though ORDER BY is present and ROWS/RANGE
> is not.
>
>
>
>
>
> On Tue, Apr 3, 2018 at 5:50 PM, Xingbo Jiang 
> wrote:
>
>> This is actually by design, without a `ORDER BY` clause, all rows are
>> considered as the peer row of the current row, which means that the frame
>> is effectively the entire partition. This behavior follows the window
>> syntax of PGSQL.
>> You can refer to the comment by yhuai: https://github.com/apac
>> he/spark/pull/5604#discussion_r157931911
>> :)
>>
>> 2018-04-04 6:27 GMT+08:00 Reynold Xin :
>>
>>> Do other (non-Hive) SQL systems do the same thing?
>>>
>>> On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
>>> her...@databricks.com> wrote:
>>>
 This is something we inherited from Hive: https://cwiki.apache.org
 /confluence/display/Hive/LanguageManual+WindowingAndAnalytics

 When ORDER BY is specified with missing WINDOW clause, the WINDOW
> specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW.

 When both ORDER BY and WINDOW clauses are missing, the WINDOW
> specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND
> UNBOUNDED FOLLOWING.


 It sort of makes sense if you think about it. If there is no ordering
 there is no way to have a bound frame. If there is ordering we default to
 the most commonly used deterministic frame.


 On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin 
 wrote:

> Seems like a bug.
>
>
>
> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:
>
>> Hi Devs,
>>
>> I am seeing some behavior with window functions that is a bit
>> unintuitive and would like to get some clarification.
>>
>> When using aggregation function with window, the frame boundary seems
>> to change depending on the order of the window.
>>
>> Example:
>> (1)
>>
>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>
>> w1 = Window.partitionBy('id')
>>
>> df.withColumn('v2', mean(df.v).over(w1)).show()
>>
>> +---+---+---+
>>
>> | id|  v| v2|
>>
>> +---+---+---+
>>
>> |  0|  1|2.0|
>>
>> |  0|  2|2.0|
>>
>> |  0|  3|2.0|
>>
>> +---+---+---+
>>
>> (2)
>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>
>> w2 = Window.partitionBy('id').orderBy('v')
>>
>> df.withColumn('v2', mean(df.v).over(w2)).show()
>>
>> +---+---+---+
>>
>> | id|  v| v2|
>>
>> +---+---+---+
>>
>> |  0|  1|1.0|
>>
>> |  0|  2|1.5|
>>
>> |  0|  3|2.0|
>>
>> +---+---+---+
>>
>> Seems like orderBy('v') in the example (2) also changes the frame
>> boundaries from (
>>
>> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
>> currentRow).
>>
>>
>> I found this behavior a bit unintuitive. I wonder if this behavior is
>> by design and if so, what's the specific rule that orderBy() interacts 
>> with
>> frame boundaries?
>>
>>
>> Thanks,
>>
>> L

Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Reynold Xin
Ah ok. Thanks for commenting. Everyday I learn something new about SQL.

For others to follow, SQL Server has a good explanation of the behavior:
https://docs.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-
transact-sql


Can somebody (Li?) update the API documentation to specify the gotchas, in
case users are not familiar with SQL window function semantics?



General Remarks


More than one window function can be used in a single query with a single
FROM clause. The OVER clause for each function can differ in partitioning
and ordering.

If PARTITION BY is not specified, the function treats all rows of the query
result set as a single group.
Important!


If ROWS/RANGE is specified and  is used for  (short syntax) then this specification is used for the window
frame boundary starting point and CURRENT ROW is used for the boundary
ending point. For example “ROWS 5 PRECEDING” is equal to “ROWS BETWEEN 5
PRECEDING AND CURRENT ROW”.

Note+

If ORDER BY is not specified entire partition is used for a window frame.
This applies only to functions that do not require ORDER BY clause. If
ROWS/RANGE is not specified but ORDER BY is specified, RANGE UNBOUNDED
PRECEDING AND CURRENT ROW is used as default for window frame. This applies
only to functions that have can accept optional ROWS/RANGE specification.
For example, ranking functions cannot accept ROWS/RANGE, therefore this
window frame is not applied even though ORDER BY is present and ROWS/RANGE
is not.





On Tue, Apr 3, 2018 at 5:50 PM, Xingbo Jiang  wrote:

> This is actually by design, without a `ORDER BY` clause, all rows are
> considered as the peer row of the current row, which means that the frame
> is effectively the entire partition. This behavior follows the window
> syntax of PGSQL.
> You can refer to the comment by yhuai: https://github.com/apac
> he/spark/pull/5604#discussion_r157931911
> :)
>
> 2018-04-04 6:27 GMT+08:00 Reynold Xin :
>
>> Do other (non-Hive) SQL systems do the same thing?
>>
>> On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
>> her...@databricks.com> wrote:
>>
>>> This is something we inherited from Hive: https://cwiki.apache.org
>>> /confluence/display/Hive/LanguageManual+WindowingAndAnalytics
>>>
>>> When ORDER BY is specified with missing WINDOW clause, the WINDOW
 specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND
 CURRENT ROW.
>>>
>>> When both ORDER BY and WINDOW clauses are missing, the WINDOW
 specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND
 UNBOUNDED FOLLOWING.
>>>
>>>
>>> It sort of makes sense if you think about it. If there is no ordering
>>> there is no way to have a bound frame. If there is ordering we default to
>>> the most commonly used deterministic frame.
>>>
>>>
>>> On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin 
>>> wrote:
>>>
 Seems like a bug.



 On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:

> Hi Devs,
>
> I am seeing some behavior with window functions that is a bit
> unintuitive and would like to get some clarification.
>
> When using aggregation function with window, the frame boundary seems
> to change depending on the order of the window.
>
> Example:
> (1)
>
> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>
> w1 = Window.partitionBy('id')
>
> df.withColumn('v2', mean(df.v).over(w1)).show()
>
> +---+---+---+
>
> | id|  v| v2|
>
> +---+---+---+
>
> |  0|  1|2.0|
>
> |  0|  2|2.0|
>
> |  0|  3|2.0|
>
> +---+---+---+
>
> (2)
> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>
> w2 = Window.partitionBy('id').orderBy('v')
>
> df.withColumn('v2', mean(df.v).over(w2)).show()
>
> +---+---+---+
>
> | id|  v| v2|
>
> +---+---+---+
>
> |  0|  1|1.0|
>
> |  0|  2|1.5|
>
> |  0|  3|2.0|
>
> +---+---+---+
>
> Seems like orderBy('v') in the example (2) also changes the frame
> boundaries from (
>
> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
> currentRow).
>
>
> I found this behavior a bit unintuitive. I wonder if this behavior is
> by design and if so, what's the specific rule that orderBy() interacts 
> with
> frame boundaries?
>
>
> Thanks,
>
> Li
>
>

>>>
>>
>


Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Xingbo Jiang
This is actually by design, without a `ORDER BY` clause, all rows are
considered as the peer row of the current row, which means that the frame
is effectively the entire partition. This behavior follows the window
syntax of PGSQL.
You can refer to the comment by yhuai:
https://github.com/apache/spark/pull/5604#discussion_r157931911
:)

2018-04-04 6:27 GMT+08:00 Reynold Xin :

> Do other (non-Hive) SQL systems do the same thing?
>
> On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
> her...@databricks.com> wrote:
>
>> This is something we inherited from Hive: https://cwiki.apache.org
>> /confluence/display/Hive/LanguageManual+WindowingAndAnalytics
>>
>> When ORDER BY is specified with missing WINDOW clause, the WINDOW
>>> specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
>>> ROW.
>>
>> When both ORDER BY and WINDOW clauses are missing, the WINDOW
>>> specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
>>> FOLLOWING.
>>
>>
>> It sort of makes sense if you think about it. If there is no ordering
>> there is no way to have a bound frame. If there is ordering we default to
>> the most commonly used deterministic frame.
>>
>>
>> On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin  wrote:
>>
>>> Seems like a bug.
>>>
>>>
>>>
>>> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:
>>>
 Hi Devs,

 I am seeing some behavior with window functions that is a bit
 unintuitive and would like to get some clarification.

 When using aggregation function with window, the frame boundary seems
 to change depending on the order of the window.

 Example:
 (1)

 df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

 w1 = Window.partitionBy('id')

 df.withColumn('v2', mean(df.v).over(w1)).show()

 +---+---+---+

 | id|  v| v2|

 +---+---+---+

 |  0|  1|2.0|

 |  0|  2|2.0|

 |  0|  3|2.0|

 +---+---+---+

 (2)
 df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

 w2 = Window.partitionBy('id').orderBy('v')

 df.withColumn('v2', mean(df.v).over(w2)).show()

 +---+---+---+

 | id|  v| v2|

 +---+---+---+

 |  0|  1|1.0|

 |  0|  2|1.5|

 |  0|  3|2.0|

 +---+---+---+

 Seems like orderBy('v') in the example (2) also changes the frame
 boundaries from (

 unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
 currentRow).


 I found this behavior a bit unintuitive. I wonder if this behavior is
 by design and if so, what's the specific rule that orderBy() interacts with
 frame boundaries?


 Thanks,

 Li


>>>
>>
>


Re: Welcome Zhenhua Wang as a Spark committer

2018-04-03 Thread Bhupendra Mishra
Welcome and congratulation Zhenhua. Cheers

On Mon, Apr 2, 2018 at 10:58 AM, Wenchen Fan  wrote:

> Hi all,
>
> The Spark PMC recently added Zhenhua Wang as a committer on the project.
> Zhenhua is the major contributor of the CBO project, and has been
> contributing across several areas of Spark for a while, focusing especially
> on analyzer, optimizer in Spark SQL. Please join me in welcoming Zhenhua!
>
> Wenchen
>


Re: Welcome Zhenhua Wang as a Spark committer

2018-04-03 Thread Josh Goldsborough
Congrats Zhenhua!

On Tue, Apr 3, 2018 at 5:38 PM, Dilip Biswal  wrote:

> Congrats, Zhenhua!  Very well deserved !!
>
>
> Regards,
> Dilip Biswal
>
>
>
>
> - Original message -
> From: Nick Pentreath 
> To: "wangzhenhua (G)" 
> Cc: Spark dev list 
> Subject: Re: Welcome Zhenhua Wang as a Spark committer
> Date: Mon, Apr 2, 2018 11:13 PM
>
> Congratulations!
>
> On Tue, 3 Apr 2018 at 05:34 wangzhenhua (G) 
> wrote:
>
>
>
> Thanks everyone! It’s my great pleasure to be part of such a professional
> and innovative community!
>
>
>
>
>
> best regards,
>
> -Zhenhua(Xander)
>
>
>
>
>
> - To
> unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


Re: [Kubernetes] Resource requests and limits for Driver and Executor Pods

2018-04-03 Thread Kimoon Kim
> I'm also wondering if we should support running in other QoS classes -
https://kubernetes.io/docs/tasks/configure-pod-container/
quality-service-pod/#qos-classes, like maybe best-effort as well
i.e. launching in a configuration that has neither the limit nor the
request specified. I haven't seen a use-case but I can imagine this is a
way for people to achieve better utilization with low priority long-running
jobs.

That's interesting. Like you said, it can be a good option for low priority
jobs. But I wonder this has implication on the recovery code inside the
driver. When many executor pods get killed by K8s because the job was
BestEffort, we probably want the driver to tolerate the higher number of
casualty. So that the job could still go on and eventually finish.

Thanks,
Kimoon

On Mon, Apr 2, 2018 at 11:11 AM, Anirudh Ramanathan 
wrote:

> In summary, it looks like a combination of David's (#20943
> ) and Yinan's PR (#20553
> ) are good solutions here.
> Agreed on the importance of requesting memoryoverhead up front.
>
> I'm also wondering if we should support running in other QoS classes -
> https://kubernetes.io/docs/tasks/configure-pod-container/
> quality-service-pod/#qos-classes, like maybe best-effort as well
> i.e. launching in a configuration that has neither the limit nor the
> request specified. I haven't seen a use-case but I can imagine this is a
> way for people to achieve better utilization with low priority long-running
> jobs.
>
> On Fri, Mar 30, 2018 at 3:06 PM Yinan Li  wrote:
>
>> Yes, the PR allows you to set say 1.5. The New configuration property
>> defaults to spark.executor.cores, which defaults to 1.
>>
>> On Fri, Mar 30, 2018, 3:03 PM Kimoon Kim  wrote:
>>
>>> David, glad it helped! And thanks for your clear example.
>>>
>>> > The only remaining question would then be what a sensible default for
>>> *spark.kubernetes.executor.cores *would be. Seeing that I wanted more
>>> than 1 and Yinan wants less, leaving it at 1 night be best.
>>>
>>> 1 as default SGTM.
>>>
>>> Thanks,
>>> Kimoon
>>>
>>> On Fri, Mar 30, 2018 at 1:38 PM, David Vogelbacher <
>>> dvogelbac...@palantir.com> wrote:
>>>
 Thanks for linking that PR Kimoon.


 It actually does mostly address the issue I was referring to. As the
 issue  I
 linked in my first email states, one physical cpu might not be enough to
 execute a task in a performant way.



 So if I set *spark.executor.cores=1* and *spark.task.cpus=1* , I will
 get 1 core from Kubernetes and execute one task per Executor and run into
 performance problems.

 Being able to specify `spark.kubernetes.executor.cores=1.2` would fix
 the issue (1.2 is just an example).

 I am curious as to why you, Yinan, would want to use this property to
 request less than 1 physical cpu (that is how it sounds to me on the PR).

 Do you have testing that indicates that less than 1 physical CPU is
 enough for executing tasks?



 In the end it boils down to the question proposed by Yinan:

 > A relevant question is should Spark on Kubernetes really be
 opinionated on how to set the cpu request and limit and even try to
 determine this automatically?



 And I completely agree with your answer Kimoon, we should provide
 sensible defaults and make it configurable, as Yinan’s PR does.

 The only remaining question would then be what a sensible default for 
 *spark.kubernetes.executor.cores
 *would be. Seeing that I wanted more than 1 and Yinan wants less,
 leaving it at 1 night be best.



 Thanks,

 David



 *From: *Kimoon Kim 
 *Date: *Friday, March 30, 2018 at 4:28 PM
 *To: *Yinan Li 
 *Cc: *David Vogelbacher , "
 dev@spark.apache.org" 
 *Subject: *Re: [Kubernetes] Resource requests and limits for Driver
 and Executor Pods



 I see. Good to learn the interaction between spark.task.cpus and
 spark.executor.cores. But am I right to say that PR #20553 can be still
 used as an additional knob on top of those two? Say a user wants 1.5 core
 per executor from Kubernetes, not the rounded up integer value 2?



 > A relevant question is should Spark on Kubernetes really be
 opinionated on how to set the cpu request and limit and even try to
 determine this automatically?



 Personally, I don't see how this can be auto-determined at all. I think
 the best we can do is to come up with sensible default values for the most
 common case, and provide and well-document other knobs for edge cases.


 Thanks,

 Kimoon



 On Fri, Mar 30, 2018 at 12:37 PM, Yinan Li 
 wrote:

 PR #20553 [github.com]
 <

Re: Welcome Zhenhua Wang as a Spark committer

2018-04-03 Thread Dilip Biswal
Congrats, Zhenhua!  Very well deserved !!
 
 
Regards,Dilip Biswal 
 
 
- Original message -From: Nick Pentreath To: "wangzhenhua (G)" Cc: Spark dev list Subject: Re: Welcome Zhenhua Wang as a Spark committerDate: Mon, Apr 2, 2018 11:13 PM 
Congratulations! 

On Tue, 3 Apr 2018 at 05:34 wangzhenhua (G)  wrote:
 
Thanks everyone! It’s my great pleasure to be part of such a professional and innovative community! 
 
 
best regards,
-Zhenhua(Xander)
 
 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Reynold Xin
Do other (non-Hive) SQL systems do the same thing?

On Tue, Apr 3, 2018 at 3:16 PM, Herman van Hövell tot Westerflier <
her...@databricks.com> wrote:

> This is something we inherited from Hive: https://cwiki.apache.
> org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics
>
> When ORDER BY is specified with missing WINDOW clause, the WINDOW
>> specification defaults to RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
>> ROW.
>
> When both ORDER BY and WINDOW clauses are missing, the WINDOW
>> specification defaults to ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
>> FOLLOWING.
>
>
> It sort of makes sense if you think about it. If there is no ordering
> there is no way to have a bound frame. If there is ordering we default to
> the most commonly used deterministic frame.
>
>
> On Tue, Apr 3, 2018 at 11:09 PM, Reynold Xin  wrote:
>
>> Seems like a bug.
>>
>>
>>
>> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:
>>
>>> Hi Devs,
>>>
>>> I am seeing some behavior with window functions that is a bit
>>> unintuitive and would like to get some clarification.
>>>
>>> When using aggregation function with window, the frame boundary seems to
>>> change depending on the order of the window.
>>>
>>> Example:
>>> (1)
>>>
>>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>>
>>> w1 = Window.partitionBy('id')
>>>
>>> df.withColumn('v2', mean(df.v).over(w1)).show()
>>>
>>> +---+---+---+
>>>
>>> | id|  v| v2|
>>>
>>> +---+---+---+
>>>
>>> |  0|  1|2.0|
>>>
>>> |  0|  2|2.0|
>>>
>>> |  0|  3|2.0|
>>>
>>> +---+---+---+
>>>
>>> (2)
>>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>>
>>> w2 = Window.partitionBy('id').orderBy('v')
>>>
>>> df.withColumn('v2', mean(df.v).over(w2)).show()
>>>
>>> +---+---+---+
>>>
>>> | id|  v| v2|
>>>
>>> +---+---+---+
>>>
>>> |  0|  1|1.0|
>>>
>>> |  0|  2|1.5|
>>>
>>> |  0|  3|2.0|
>>>
>>> +---+---+---+
>>>
>>> Seems like orderBy('v') in the example (2) also changes the frame
>>> boundaries from (
>>>
>>> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
>>> currentRow).
>>>
>>>
>>> I found this behavior a bit unintuitive. I wonder if this behavior is by
>>> design and if so, what's the specific rule that orderBy() interacts with
>>> frame boundaries?
>>>
>>>
>>> Thanks,
>>>
>>> Li
>>>
>>>
>>
>


Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Li Jin
Here is the original code and comments:
https://github.com/apache/spark/commit/b6b50efc854f298d5b3e11c05dca995a85bec962#diff-4a8f00ca33a80744965463dcc6662c75L277

Seems this is intentional. Although I am not really sure why - maybe to
match other SQL systems behavior?

On Tue, Apr 3, 2018 at 5:09 PM, Reynold Xin  wrote:

> Seems like a bug.
>
>
>
> On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:
>
>> Hi Devs,
>>
>> I am seeing some behavior with window functions that is a bit unintuitive
>> and would like to get some clarification.
>>
>> When using aggregation function with window, the frame boundary seems to
>> change depending on the order of the window.
>>
>> Example:
>> (1)
>>
>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>
>> w1 = Window.partitionBy('id')
>>
>> df.withColumn('v2', mean(df.v).over(w1)).show()
>>
>> +---+---+---+
>>
>> | id|  v| v2|
>>
>> +---+---+---+
>>
>> |  0|  1|2.0|
>>
>> |  0|  2|2.0|
>>
>> |  0|  3|2.0|
>>
>> +---+---+---+
>>
>> (2)
>> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>>
>> w2 = Window.partitionBy('id').orderBy('v')
>>
>> df.withColumn('v2', mean(df.v).over(w2)).show()
>>
>> +---+---+---+
>>
>> | id|  v| v2|
>>
>> +---+---+---+
>>
>> |  0|  1|1.0|
>>
>> |  0|  2|1.5|
>>
>> |  0|  3|2.0|
>>
>> +---+---+---+
>>
>> Seems like orderBy('v') in the example (2) also changes the frame
>> boundaries from (
>>
>> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
>> currentRow).
>>
>>
>> I found this behavior a bit unintuitive. I wonder if this behavior is by
>> design and if so, what's the specific rule that orderBy() interacts with
>> frame boundaries?
>>
>>
>> Thanks,
>>
>> Li
>>
>>
>


Re: Clarify window behavior in Spark SQL

2018-04-03 Thread Reynold Xin
Seems like a bug.



On Tue, Apr 3, 2018 at 1:26 PM, Li Jin  wrote:

> Hi Devs,
>
> I am seeing some behavior with window functions that is a bit unintuitive
> and would like to get some clarification.
>
> When using aggregation function with window, the frame boundary seems to
> change depending on the order of the window.
>
> Example:
> (1)
>
> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>
> w1 = Window.partitionBy('id')
>
> df.withColumn('v2', mean(df.v).over(w1)).show()
>
> +---+---+---+
>
> | id|  v| v2|
>
> +---+---+---+
>
> |  0|  1|2.0|
>
> |  0|  2|2.0|
>
> |  0|  3|2.0|
>
> +---+---+---+
>
> (2)
> df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')
>
> w2 = Window.partitionBy('id').orderBy('v')
>
> df.withColumn('v2', mean(df.v).over(w2)).show()
>
> +---+---+---+
>
> | id|  v| v2|
>
> +---+---+---+
>
> |  0|  1|1.0|
>
> |  0|  2|1.5|
>
> |  0|  3|2.0|
>
> +---+---+---+
>
> Seems like orderBy('v') in the example (2) also changes the frame
> boundaries from (
>
> unboundedPreceding, unboundedFollowing) to (unboundedPreceding,
> currentRow).
>
>
> I found this behavior a bit unintuitive. I wonder if this behavior is by
> design and if so, what's the specific rule that orderBy() interacts with
> frame boundaries?
>
>
> Thanks,
>
> Li
>
>


Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

2018-04-03 Thread Steve Loughran


> On 3 Apr 2018, at 11:19, cane  wrote:
> 
> Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
> data loss.
> I check the comment of thi api:
> 
>  We should make sure our tasks are idempotent when speculation is enabled,
> i.e. do
>   * not use output committer that writes data directly.
>   * There is an example in
> https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
>   * result of using direct output committer with speculation enabled.
>   */
> 
> But if this the rule we must follow?
> For example,for parquet it will got ParquetOutPutCommitter.
> In this case, speculation must disable for parquet?
> 
> Is there some one know the history?
> Thanks too much!


If you are writing to HDFS or object stores other than s3 and you make sure 
that you are using the FileOutputFormat commit algorithm, you can use 
speculation without problems. 

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

if you use the version 2 algorithm then you are vulnerable to a failure during 
task commit, but only during task commit and then if speculative/repeated tasks 
generate output files with different names.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

If you are using S3 as a direct destination of work, then, in the absence of a 
consistency layer (S3mer, EMR consistent s3, Hadoop 3,x + S3Guard) or an 
S3-Specific committer, you are always at risk of data loss. Don't dp that

Further reading

https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Hadoop 3 support

2018-04-03 Thread Steve Loughran


On 3 Apr 2018, at 01:30, Saisai Shao 
mailto:sai.sai.s...@gmail.com>> wrote:

Yes, the main blocking issue is the hive version used in Spark (1.2.1.spark) 
doesn't support run on Hadoop 3. Hive will check the Hadoop version in the 
runtime [1]. Besides this I think some pom changes should be enough to support 
Hadoop 3.

If we want to use Hadoop 3 shaded client jar, then the pom requires lots of 
changes, but this is not necessary.


[1] 
https://github.com/apache/hive/blob/6751225a5cde4c40839df8b46e8d241fdda5cd34/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java#L144


I don't think the hadoop-shaded JAR is complete enough for spark yet...it was 
very much driven by HBase's needs. But there's only one way to get Hadoop to 
fix that: try the move, find the problems, complain noisily. Then Hadoop 3.2 
and/or a 3.1.x for x>=1 can have the broader shading

Assume my name is next to the "Shade hadoop-cloud-storage" problem, though 
there the fact that aws-java-sdk-bundle is 50 MB already, I don't plan to shade 
that at all. The AWS shading already isolates everything from amazon's choice 
of Jackson, which was one of the sore points.

-Steve


Re: Hadoop 3 support

2018-04-03 Thread Steve Loughran


On 3 Apr 2018, at 01:30, Saisai Shao 
mailto:sai.sai.s...@gmail.com>> wrote:

Yes, the main blocking issue is the hive version used in Spark (1.2.1.spark) 
doesn't support run on Hadoop 3. Hive will check the Hadoop version in the 
runtime [1]. Besides this I think some pom changes should be enough to support 
Hadoop 3.

If we want to use Hadoop 3 shaded client jar, then the pom requires lots of 
changes, but this is not necessary.


[1] 
https://github.com/apache/hive/blob/6751225a5cde4c40839df8b46e8d241fdda5cd34/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java#L144

2018-04-03 4:57 GMT+08:00 Marcelo Vanzin 
mailto:van...@cloudera.com>>:
Saisai filed SPARK-23534, but the main blocking issue is really SPARK-18673.


On Mon, Apr 2, 2018 at 1:00 PM, Reynold Xin 
mailto:r...@databricks.com>> wrote:
> Does anybody know what needs to be done in order for Spark to support Hadoop
> 3?
>


To be ruthless, I'd view Hadoop 3.1 as the first one to play with...3.0.x was 
more of a wide-version check. Hadoop 3.1RC0 is out this week, making it the 
ideal (last!) time to find showstoppers.

1. I've got a PR which adds a profile to build spark against hadoop 3, with 
some fixes for zk import along with better hadoop-cloud profile

https://github.com/apache/spark/pull/20923


Apply that and patch and both mvn and sbt can build with the RC0 from the ASF 
staging repo:

build/sbt -Phadoop-3,hadoop-cloud,yarn -Psnapshots-and-staging



2. Everything Marcelo says about hive.

You can build hadoop locally with a -Dhadoop.version=2.11 and the hive 
1.2.1.-spark version check goes through. You can't safely bring up HDFS like 
that, but you can run spark standalone against things

Some strategies

Short term: build a new hive-1,2.x-spark which fixes up the version check and 
merges in those critical patches that cloudera, hortoworks, databricks, + 
anyone else has got in for their production systems. I don't think we have that 
many.

That leaves a "how to release" story, as the ASF will want it to come out under 
the ASF auspices, and, given the liability disclaimers, so should everyone. The 
Hive team could be "invited" to publish it as their own if people ask nicely.

Long term
 -do something about that subclassing to get the thrift endpoint to work. That 
can include fixing hive's service to be subclass friendly.
 -move to hive 2

That' s a major piece of work.


Clarify window behavior in Spark SQL

2018-04-03 Thread Li Jin
Hi Devs,

I am seeing some behavior with window functions that is a bit unintuitive
and would like to get some clarification.

When using aggregation function with window, the frame boundary seems to
change depending on the order of the window.

Example:
(1)

df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

w1 = Window.partitionBy('id')

df.withColumn('v2', mean(df.v).over(w1)).show()

+---+---+---+

| id|  v| v2|

+---+---+---+

|  0|  1|2.0|

|  0|  2|2.0|

|  0|  3|2.0|

+---+---+---+

(2)
df = spark.createDataFrame([[0, 1], [0, 2], [0, 3]]).toDF('id', 'v')

w2 = Window.partitionBy('id').orderBy('v')

df.withColumn('v2', mean(df.v).over(w2)).show()

+---+---+---+

| id|  v| v2|

+---+---+---+

|  0|  1|1.0|

|  0|  2|1.5|

|  0|  3|2.0|

+---+---+---+

Seems like orderBy('v') in the example (2) also changes the frame
boundaries from (

unboundedPreceding, unboundedFollowing) to (unboundedPreceding, currentRow).


I found this behavior a bit unintuitive. I wonder if this behavior is by
design and if so, what's the specific rule that orderBy() interacts with
frame boundaries?


Thanks,

Li


Re: [build system] experiencing network issues, git fetch timeouts likely

2018-04-03 Thread shane knapp
...and we're back!

On Tue, Apr 3, 2018 at 8:10 AM, shane knapp  wrote:

> this apparently caused jenkins to get wedged overnight.  i'll restarting
> it now.
>
> On Mon, Apr 2, 2018 at 9:12 PM, shane knapp  wrote:
>
>> the problem was identified and fixed, and we should be good as of about
>> an hour ago.
>>
>> sorry for any inconvenience!
>>
>> On Mon, Apr 2, 2018 at 4:15 PM, shane knapp  wrote:
>>
>>> hey all!
>>>
>>> we're having network issues on campus right now, and the jenkins workers
>>> are experiencing up to 40% packet loss on our pings to github.
>>>
>>> this can cause builds to time out when attempting to git fetch.
>>>
>>> i'll post an update on the network status when i find out more about
>>> what's going on.
>>>
>>> shane
>>> --
>>> Shane Knapp
>>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>>> https://rise.cs.berkeley.edu
>>>
>>
>>
>>
>> --
>> Shane Knapp
>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>> https://rise.cs.berkeley.edu
>>
>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>



-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


Re: [build system] experiencing network issues, git fetch timeouts likely

2018-04-03 Thread shane knapp
this apparently caused jenkins to get wedged overnight.  i'll restarting it
now.

On Mon, Apr 2, 2018 at 9:12 PM, shane knapp  wrote:

> the problem was identified and fixed, and we should be good as of about an
> hour ago.
>
> sorry for any inconvenience!
>
> On Mon, Apr 2, 2018 at 4:15 PM, shane knapp  wrote:
>
>> hey all!
>>
>> we're having network issues on campus right now, and the jenkins workers
>> are experiencing up to 40% packet loss on our pings to github.
>>
>> this can cause builds to time out when attempting to git fetch.
>>
>> i'll post an update on the network status when i find out more about
>> what's going on.
>>
>> shane
>> --
>> Shane Knapp
>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>> https://rise.cs.berkeley.edu
>>
>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>



-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

2018-04-03 Thread cane
Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
data loss.
I check the comment of thi api:

  We should make sure our tasks are idempotent when speculation is enabled,
i.e. do
   * not use output committer that writes data directly.
   * There is an example in
https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
   * result of using direct output committer with speculation enabled.
   */

But if this the rule we must follow?
For example,for parquet it will got ParquetOutPutCommitter.
In this case, speculation must disable for parquet?

Is there some one know the history?
Thanks too much!




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org