Re: [vote] Apache Spark 3.0 RC3

2020-06-08 Thread Michael Armbrust
+1 (binding)

On Mon, Jun 8, 2020 at 1:22 PM DB Tsai  wrote:

> +1 (binding)
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 42E5B25A8F7A82C1
>
> On Mon, Jun 8, 2020 at 1:03 PM Dongjoon Hyun 
> wrote:
> >
> > +1
> >
> > Thanks,
> > Dongjoon.
> >
> > On Mon, Jun 8, 2020 at 6:37 AM Russell Spitzer <
> russell.spit...@gmail.com> wrote:
> >>
> >> +1 (non-binding) ran the new SCC DSV2 suite and all other tests, no
> issues
> >>
> >> On Sun, Jun 7, 2020 at 11:12 PM Yin Huai  wrote:
> >>>
> >>> Hello everyone,
> >>>
> >>> I am wondering if it makes more sense to not count Saturday and
> Sunday. I doubt that any serious testing work was done during this past
> weekend. Can we only count business days in the voting process?
> >>>
> >>> Thanks,
> >>>
> >>> Yin
> >>>
> >>> On Sun, Jun 7, 2020 at 3:24 PM Denny Lee 
> wrote:
> 
>  +1 (non-binding)
> 
>  On Sun, Jun 7, 2020 at 3:21 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
> >
> > I'm seeing the effort of including the correctness issue SPARK-28067
> [1] to 3.0.0 via SPARK-31894 [2]. That doesn't seem to be a regression so
> technically doesn't block the release, so while it'd be good to weigh its
> worth (it requires some SS users to discard the state so might bring less
> frightened requiring it in major version upgrade), it looks to be optional
> to include SPARK-28067 to 3.0.0.
> >
> > Besides, I see all blockers look to be resolved, thanks all for the
> amazing efforts!
> >
> > +1 (non-binding) if the decision of SPARK-28067 is "later".
> >
> > 1. https://issues.apache.org/jira/browse/SPARK-28067
> > 2. https://issues.apache.org/jira/browse/SPARK-31894
> >
> > On Mon, Jun 8, 2020 at 5:23 AM Matei Zaharia <
> matei.zaha...@gmail.com> wrote:
> >>
> >> +1
> >>
> >> Matei
> >>
> >> On Jun 7, 2020, at 6:53 AM, Maxim Gekk 
> wrote:
> >>
> >> +1 (non-binding)
> >>
> >> On Sun, Jun 7, 2020 at 2:34 PM Takeshi Yamamuro <
> linguin@gmail.com> wrote:
> >>>
> >>> +1 (non-binding)
> >>>
> >>> I don't see any ongoing PR to fix critical bugs in my area.
> >>> Bests,
> >>> Takeshi
> >>>
> >>> On Sun, Jun 7, 2020 at 7:24 PM Mridul Muralidharan <
> mri...@gmail.com> wrote:
> 
>  +1
> 
>  Regards,
>  Mridul
> 
>  On Sat, Jun 6, 2020 at 1:20 PM Reynold Xin 
> wrote:
> >
> > Apologies for the mistake. The vote is open till 11:59pm Pacific
> time on Mon June 9th.
> >
> > On Sat, Jun 6, 2020 at 1:08 PM Reynold Xin 
> wrote:
> >>
> >> Please vote on releasing the following candidate as Apache
> Spark version 3.0.0.
> >>
> >> The vote is open until [DUE DAY] and passes if a majority +1
> PMC votes are cast, with a minimum of 3 +1 votes.
> >>
> >> [ ] +1 Release this package as Apache Spark 3.0.0
> >> [ ] -1 Do not release this package because ...
> >>
> >> To learn more about Apache Spark, please see
> http://spark.apache.org/
> >>
> >> The tag to be voted on is v3.0.0-rc3 (commit
> 3fdfce3120f307147244e5eaf46d61419a723d50):
> >> https://github.com/apache/spark/tree/v3.0.0-rc3
> >>
> >> The release files, including signatures, digests, etc. can be
> found at:
> >> https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc3-bin/
> >>
> >> Signatures used for Spark RCs can be found in this file:
> >> https://dist.apache.org/repos/dist/dev/spark/KEYS
> >>
> >> The staging repository for this release can be found at:
> >>
> https://repository.apache.org/content/repositories/orgapachespark-1350/
> >>
> >> The documentation corresponding to this release can be found at:
> >> https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc3-docs/
> >>
> >> The list of bug fixes going into 3.0.0 can be found at the
> following URL:
> >> https://issues.apache.org/jira/projects/SPARK/versions/12339177
> >>
> >> This release is using the release script of the tag v3.0.0-rc3.
> >>
> >> FAQ
> >>
> >> =
> >> How can I help test this release?
> >> =
> >>
> >> If you are a Spark user, you can help us test this release by
> taking
> >> an existing Spark workload and running on this release
> candidate, then
> >> reporting any regressions.
> >>
> >> If you're working in PySpark you can set up a virtual env and
> install
> >> the current RC and see if anything important breaks, in the
> Java/Scala
> >> you can add the staging repository to your projects resolvers
> and test
> >> with the RC (make sure to clean up 

[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2020-04-02 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073951#comment-17073951
 ] 

Michael Armbrust commented on SPARK-29358:
--

Sure, but it is very easy to make this not a behavior change.  Add an optional 
boolean parameter, {{allowMissingColumns}} (or something) that defaults to 
{{false}}.

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29358) Make unionByName optionally fill missing columns with nulls

2020-03-31 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071968#comment-17071968
 ] 

Michael Armbrust commented on SPARK-29358:
--

I think we should reconsider closing this as won't fix:
 - I think the semantics of this operation make sense. We already have this 
with writing JSON or parquet data. It is just a really inefficient way to 
accomplish the end goal.
 - I don't think it is a problem to move "away from SQL union". This is a 
clearly named, different operation. IMO this one makes *more* sense than SQL 
union. It is much more likely that columns with the same name are semantically 
equivalent than columns at the same ordinal with different names.
 - We are not breaking the behavior of unionByName. Currently it throws an 
exception in these cases. We are making more data transformations possible, but 
anything that was working before will continue to work. You could add a boolean 
flag if you were really concerned, but I think I would skip that.

> Make unionByName optionally fill missing columns with nulls
> ---
>
> Key: SPARK-29358
> URL: https://issues.apache.org/jira/browse/SPARK-29358
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mukul Murthy
>Priority: Major
>
> Currently, unionByName requires two DataFrames to have the same set of 
> columns (even though the order can be different). It would be good to add 
> either an option to unionByName or a new type of union which fills in missing 
> columns with nulls. 
> {code:java}
> val df1 = Seq(1, 2, 3).toDF("x")
> val df2 = Seq("a", "b", "c").toDF("y")
> df1.unionByName(df2){code}
> This currently throws 
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "x" among 
> (y);
> {code}
> Ideally, there would be a way to make this return a DataFrame containing:
> {code:java}
> +++ 
> | x| y| 
> +++ 
> | 1|null| 
> | 2|null| 
> | 3|null| 
> |null| a| 
> |null| b| 
> |null| c| 
> +++
> {code}
> Currently the workaround to make this possible is by using unionByName, but 
> this is clunky:
> {code:java}
> df1.withColumn("y", lit(null)).unionByName(df2.withColumn("x", lit(null)))
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: FYI: The evolution on `CHAR` type behavior

2020-03-17 Thread Michael Armbrust
>
> What I'd oppose is to just ban char for the native data sources, and do
> not have a plan to address this problem systematically.
>

+1


> Just forget about padding, like what Snowflake and MySQL have done.
> Document that char(x) is just an alias for string. And then move on. Almost
> no work needs to be done...
>

+1


[jira] [Commented] (SPARK-31136) Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax

2020-03-12 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17058287#comment-17058287
 ] 

Michael Armbrust commented on SPARK-31136:
--

How hard would it be to add support for "LOAD DATA INPATH" rather than revert 
this? Is that the only workflow that breaks?

> Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax
> -
>
> Key: SPARK-31136
> URL: https://issues.apache.org/jira/browse/SPARK-31136
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Priority: Blocker
>
> We need to consider the behavior change of SPARK-30098 .
> This is a placeholder to keep the discussion and the final decision.
> `CREATE TABLE` syntax changes its behavior silently.
> The following is one example of the breaking the existing user data pipelines.
> *Apache Spark 2.4.5*
> {code}
> spark-sql> CREATE TABLE t(a STRING);
> Time taken: 3.061 seconds
> spark-sql> LOAD DATA INPATH '/usr/local/spark/README.md' INTO TABLE t;
> Time taken: 0.383 seconds
> spark-sql> SELECT * FROM t LIMIT 1;
> # Apache Spark
> Time taken: 2.05 seconds, Fetched 1 row(s)
> {code}
> *Apache Spark 3.0.0-preview2*
> {code}
> spark-sql> CREATE TABLE t(a STRING);
> Time taken: 3.969 seconds
> spark-sql> LOAD DATA INPATH '/usr/local/spark/README.md' INTO TABLE t;
> Error in query: LOAD DATA is not supported for datasource tables: 
> `default`.`t`;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31136) Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax

2020-03-12 Thread Michael Armbrust (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17058125#comment-17058125
 ] 

Michael Armbrust commented on SPARK-31136:
--

What was the default before, hive sequence files? That is pretty bad. They will 
get orders of magnitude better performance with parquet (I've seen users really 
burned by the performance of the old default here).

What operations are affected by this? What happens when run a program without 
changing anything?
 - For new tables, I assume you just get better performance?
 - Are there any operations where this breaks things? Can you "corrupt" a hive 
table by accidentally writing parquet data into it?

> Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax
> -
>
> Key: SPARK-31136
> URL: https://issues.apache.org/jira/browse/SPARK-31136
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Priority: Major
>
> We need to consider the behavior change of SPARK-30098 .
> This is a placeholder to keep the discussion and the final decision.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Amend Spark's Semantic Versioning Policy

2020-03-11 Thread Michael Armbrust
Thank you for the discussion everyone! This vote passes. I'll work to get
this posed on the website.

+1
Michael Armbrust
Sean Owen
Jules Damji
大啊
Ismaël Mejía
Wenchen Fan
Matei Zaharia
Gengliang Wang
Takeshi Yamamuro
Denny Lee
Xiao Li
Xingbo Jiang
Tkuya UESHIN
Hichael Heuer
John Zhuge
Reynold Xin
Burak Yavuz
Holden Karau
Dongjoon Hyun

To respond to some of the questions on the interpretation of this policy:


> Also, can we expand on 'when' an API change can occur ?  Since we are
> proposing to diverge from semver.
> Patch release ? Minor release ? Only major release ? Based on 'impact' of
> API ? Stability guarantees ?


This is an addition to the existing semvar policy. We still do not break
stable APIs at major versions.

This new policy has a good indention, but can we narrow down on the
> migration from Apache Spark 2.4.5 to Apache Spark 3.0+?


I do not think that we should apply this policy to the 3.0 release any
differently than we will for future releases. There is nothing special
about 3.0 that means unnecessary breakages will not be costly to our users.

If I had to summarize the policy in once sentence it would be "Think about
users before you break APIs!". As I mentioned in my original email, I think
in many cases this did not happen in the lead up to this release. Rather,
the reasoning in some cases was that "This is a major release, so we can
break things".

Given that we all agree major versions are not sufficient justification to
break APIs, I think its reasonable to revisit and discuss on a case-by-case
basis, some of the more commonly used, broken APIs in the context of this
rubric.

We had better be more careful when we add a new policy and should aim not
> to mislead the users and 3rd party library developers to say "older is
> better".


Nothing in the policy says "older is better". It only says that age is one
factor to consider when trying to reason about usage. If an API has been
around for a long time, its possible (but not always true) that it will
have more usage than an old API. If usage is low, and the cost to keep it
is high, get rid of it even if its very old.

The policy also explicitly calls out updating docs to recommend the
new/"correct" way of doing things. If you can convince all the users of
Spark to switch, then you can remove any API you want in the future :)

Is this only applying to stable apis?


This is not explicitly called out, but I would argue you should still think
about users, even when breaking experimental APIs. The bar is certainly
lower here, we explicitly called out that these APIs might change. That
said, I still would go though the exercise and decide if the benefits
outweigh the costs before doing it. (Very similar to the discussion before
our 1.0, before any promises of stability had been made).

the way I read this proposal isn't really saying we can't break api's on
> major releases, its just saying spend more time making sure its worth it.


I agree with this interpretation!

Michael

On Tue, Mar 10, 2020 at 10:59 AM Tom Graves  wrote:

> Overall makes sense to me, but have same questions as others on the thread.
>
> Is this only applying to stable apis?
> How are we going to apply to 3.0?
>
> the way I read this proposal isn't really saying we can't break api's on
> major releases, its just saying spend more time making sure its worth it.
> Tom
>
> On Friday, March 6, 2020, 08:59:03 PM CST, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>
> I propose to add the following text to Spark's Semantic Versioning policy
> <https://spark.apache.org/versioning-policy.html> and adopt it as the
> rubric that should be used when deciding to break APIs (even at major
> versions such as 3.0).
>
>
> I'll leave the vote open until Tuesday, March 10th at 2pm. As this is a 
> procedural
> vote <https://www.apache.org/foundation/voting.html>, the measure will
> pass if there are more favourable votes than unfavourable ones. PMC votes
> are binding, but the community is encouraged to add their voice to the
> discussion.
>
>
> [ ] +1 - Spark should adopt this policy.
>
> [ ] -1  - Spark should not adopt this policy.
>
>
> 
>
>
> Considerations When Breaking APIs
>
> The Spark project strives to avoid breaking APIs or silently changing
> behavior, even at major versions. While this is not always possible, the
> balance of the following factors should be considered before choosing to
> break an API.
>
> Cost of Breaking an API
>
> Breaking an API almost always has a non-trivial cost to the users of
> Spark. A broken API means that Spark programs need to be rewritten before
> they can be upgraded. However, there are a few considerations when thinking
> about what the cost will be:
>
>-
>
> 

Re: [VOTE] Amend Spark's Semantic Versioning Policy

2020-03-06 Thread Michael Armbrust
I'll start off the vote with a strong +1 (binding).

On Fri, Mar 6, 2020 at 1:01 PM Michael Armbrust 
wrote:

> I propose to add the following text to Spark's Semantic Versioning policy
> <https://spark.apache.org/versioning-policy.html> and adopt it as the
> rubric that should be used when deciding to break APIs (even at major
> versions such as 3.0).
>
>
> I'll leave the vote open until Tuesday, March 10th at 2pm. As this is a 
> procedural
> vote <https://www.apache.org/foundation/voting.html>, the measure will
> pass if there are more favourable votes than unfavourable ones. PMC votes
> are binding, but the community is encouraged to add their voice to the
> discussion.
>
>
> [ ] +1 - Spark should adopt this policy.
>
> [ ] -1  - Spark should not adopt this policy.
>
>
> 
>
>
> Considerations When Breaking APIs
>
> The Spark project strives to avoid breaking APIs or silently changing
> behavior, even at major versions. While this is not always possible, the
> balance of the following factors should be considered before choosing to
> break an API.
>
> Cost of Breaking an API
>
> Breaking an API almost always has a non-trivial cost to the users of
> Spark. A broken API means that Spark programs need to be rewritten before
> they can be upgraded. However, there are a few considerations when thinking
> about what the cost will be:
>
>-
>
>Usage - an API that is actively used in many different places, is
>always very costly to break. While it is hard to know usage for sure, there
>are a bunch of ways that we can estimate:
>-
>
>   How long has the API been in Spark?
>   -
>
>   Is the API common even for basic programs?
>   -
>
>   How often do we see recent questions in JIRA or mailing lists?
>   -
>
>   How often does it appear in StackOverflow or blogs?
>   -
>
>Behavior after the break - How will a program that works today, work
>after the break? The following are listed roughly in order of increasing
>severity:
>-
>
>   Will there be a compiler or linker error?
>   -
>
>   Will there be a runtime exception?
>   -
>
>   Will that exception happen after significant processing has been
>   done?
>   -
>
>   Will we silently return different answers? (very hard to debug,
>   might not even notice!)
>
>
> Cost of Maintaining an API
>
> Of course, the above does not mean that we will never break any APIs. We
> must also consider the cost both to the project and to our users of keeping
> the API in question.
>
>-
>
>Project Costs - Every API we have needs to be tested and needs to keep
>working as other parts of the project changes. These costs are
>significantly exacerbated when external dependencies change (the JVM,
>Scala, etc). In some cases, while not completely technically infeasible,
>the cost of maintaining a particular API can become too high.
>-
>
>User Costs - APIs also have a cognitive cost to users learning Spark
>or trying to understand Spark programs. This cost becomes even higher when
>the API in question has confusing or undefined semantics.
>
>
> Alternatives to Breaking an API
>
> In cases where there is a "Bad API", but where the cost of removal is also
> high, there are alternatives that should be considered that do not hurt
> existing users but do address some of the maintenance costs.
>
>
>-
>
>Avoid Bad APIs - While this is a bit obvious, it is an important
>point. Anytime we are adding a new interface to Spark we should consider
>that we might be stuck with this API forever. Think deeply about how
>new APIs relate to existing ones, as well as how you expect them to evolve
>over time.
>-
>
>Deprecation Warnings - All deprecation warnings should point to a
>clear alternative and should never just say that an API is deprecated.
>-
>
>Updated Docs - Documentation should point to the "best" recommended
>way of performing a given task. In the cases where we maintain legacy
>documentation, we should clearly point to newer APIs and suggest to users
>the "right" way.
>-
>
>Community Work - Many people learn Spark by reading blogs and other
>sites such as StackOverflow. However, many of these resources are out of
>date. Update them, to reduce the cost of eventually removing deprecated
>APIs.
>
>
> 
>


[VOTE] Amend Spark's Semantic Versioning Policy

2020-03-06 Thread Michael Armbrust
I propose to add the following text to Spark's Semantic Versioning policy
 and adopt it as the
rubric that should be used when deciding to break APIs (even at major
versions such as 3.0).


I'll leave the vote open until Tuesday, March 10th at 2pm. As this is
a procedural
vote , the measure will pass
if there are more favourable votes than unfavourable ones. PMC votes are
binding, but the community is encouraged to add their voice to the
discussion.


[ ] +1 - Spark should adopt this policy.

[ ] -1  - Spark should not adopt this policy.





Considerations When Breaking APIs

The Spark project strives to avoid breaking APIs or silently changing
behavior, even at major versions. While this is not always possible, the
balance of the following factors should be considered before choosing to
break an API.

Cost of Breaking an API

Breaking an API almost always has a non-trivial cost to the users of Spark.
A broken API means that Spark programs need to be rewritten before they can
be upgraded. However, there are a few considerations when thinking about
what the cost will be:

   -

   Usage - an API that is actively used in many different places, is always
   very costly to break. While it is hard to know usage for sure, there are a
   bunch of ways that we can estimate:
   -

  How long has the API been in Spark?
  -

  Is the API common even for basic programs?
  -

  How often do we see recent questions in JIRA or mailing lists?
  -

  How often does it appear in StackOverflow or blogs?
  -

   Behavior after the break - How will a program that works today, work
   after the break? The following are listed roughly in order of increasing
   severity:
   -

  Will there be a compiler or linker error?
  -

  Will there be a runtime exception?
  -

  Will that exception happen after significant processing has been done?
  -

  Will we silently return different answers? (very hard to debug, might
  not even notice!)


Cost of Maintaining an API

Of course, the above does not mean that we will never break any APIs. We
must also consider the cost both to the project and to our users of keeping
the API in question.

   -

   Project Costs - Every API we have needs to be tested and needs to keep
   working as other parts of the project changes. These costs are
   significantly exacerbated when external dependencies change (the JVM,
   Scala, etc). In some cases, while not completely technically infeasible,
   the cost of maintaining a particular API can become too high.
   -

   User Costs - APIs also have a cognitive cost to users learning Spark or
   trying to understand Spark programs. This cost becomes even higher when the
   API in question has confusing or undefined semantics.


Alternatives to Breaking an API

In cases where there is a "Bad API", but where the cost of removal is also
high, there are alternatives that should be considered that do not hurt
existing users but do address some of the maintenance costs.


   -

   Avoid Bad APIs - While this is a bit obvious, it is an important point.
   Anytime we are adding a new interface to Spark we should consider that we
   might be stuck with this API forever. Think deeply about how new APIs
   relate to existing ones, as well as how you expect them to evolve over time.
   -

   Deprecation Warnings - All deprecation warnings should point to a clear
   alternative and should never just say that an API is deprecated.
   -

   Updated Docs - Documentation should point to the "best" recommended way
   of performing a given task. In the cases where we maintain legacy
   documentation, we should clearly point to newer APIs and suggest to users
   the "right" way.
   -

   Community Work - Many people learn Spark by reading blogs and other
   sites such as StackOverflow. However, many of these resources are out of
   date. Update them, to reduce the cost of eventually removing deprecated
   APIs.





Re: [Proposal] Modification to Spark's Semantic Versioning Policy

2020-02-27 Thread Michael Armbrust
Thanks for the discussion! A few responses:

The decision needs to happen at api/config change time, otherwise the
> deprecated warning has no purpose if we are never going to remove them.
>

Even if we never remove an API, I think deprecation warnings (when done
right) can still serve a purpose. For new users, a deprecation can serve as
a pointer to newer, faster APIs or ones with less sharp edges. I would be
supportive of efforts that use them to clean up the docs. For example, we
could hide deprecated APIs after some time so they don't clutter scala/java
doc. We can and should audit things like the user guide and our own
examples to make sure they don't use deprecated APIs.


> That said we still need to be able to remove deprecated things and change
> APIs in major releases, otherwise why do a  major release in the first
> place.  Is it purely to support newer Scala/python/java versions.
>

I don't think Major versions are purely for
Scala/Java/Python/Hive/Metastore, but they are a good chance to move the
project forward. Spark 3.0 has a lot of upgrades here, and I think we did
make the right trade-offs here, even though there are some API breaks.

Major versions are also a good time to drop major changes (i.e. in 2.0 we
released whole-stage code gen).


> I think the hardest part listed here is what the impact is.  Who's call is
> that, it's hard to know how everyone is using things and I think it's been
> harder to get feedback on SPIPs and API changes in general as people are
> busy with other things.
>

This is the hardest part, and we won't always get it right. I think that
having the rubric though will help guide the conversation and help
reviewers ask the right questions.

One other thing I'll add is, sometimes the users come to us and we should
listen! I was very surprised by the response to Karen's email on this list
last week. An actual user was giving us feedback on the impact of the
changes in Spark 3.0 and rather than listen there was a lot of push back.
Users are never wrong when they are telling you what matters to them!


> Like you mention, I think stackoverflow is unreliable, the posts could be
> many years old and no longer relevant.
>

While this is unfortunate, I think the more we can do to keep these
answer relevant (either by updating them or by not breaking them) is good
for the health of the Spark community.


Re: Clarification on the commit protocol

2020-02-27 Thread Michael Armbrust
No, it is not. Although the commit protocol has mostly been superseded by Delta
Lake , which is available as a separate open source
project that works natively with Apache Spark. In contrast to the commit
protocol, Delta can guarantee full ACID (rather than just partition level
atomicity). It also has better performance in many cases, as it reduces the
amount of metadata that needs to be retrieved from the storage system.

On Wed, Feb 26, 2020 at 9:36 PM rahul c  wrote:

> Hi team,
>
> Just wanted to understand.
> Is DBIO commit protocol available in open source spark version ?
>


[Proposal] Modification to Spark's Semantic Versioning Policy

2020-02-24 Thread Michael Armbrust
Hello Everyone,

As more users have started upgrading to Spark 3.0 preview (including
myself), there have been many discussions around APIs that have been broken
compared with Spark 2.x. In many of these discussions, one of the
rationales for breaking an API seems to be "Spark follows semantic
versioning , so this major
release is our chance to get it right [by breaking APIs]". Similarly, in
many cases the response to questions about why an API was completely
removed has been, "this API has been deprecated since x.x, so we have to
remove it".

As a long time contributor to and user of Spark this interpretation of the
policy is concerning to me. This reasoning misses the intention of the
original policy, and I am worried that it will hurt the long-term success
of the project.

I definitely understand that these are hard decisions, and I'm not
proposing that we never remove anything from Spark. However, I would like
to give some additional context and also propose a different rubric for
thinking about API breakage moving forward.

Spark adopted semantic versioning back in 2014 during the preparations for
the 1.0 release. As this was the first major release -- and as, up until
fairly recently, Spark had only been an academic project -- no real
promises had been made about API stability ever.

During the discussion, some committers suggested that this was an
opportunity to clean up cruft and give the Spark APIs a once-over, making
cosmetic changes to improve consistency. However, in the end, it was
decided that in many cases it was not in the best interests of the Spark
community to break things just because we could. Matei actually said it
pretty forcefully

:

I know that some names are suboptimal, but I absolutely detest breaking
APIs, config names, etc. I’ve seen it happen way too often in other
projects (even things we depend on that are officially post-1.0, like Akka
or Protobuf or Hadoop), and it’s very painful. I think that we as fairly
cutting-edge users are okay with libraries occasionally changing, but many
others will consider it a show-stopper. Given this, I think that any
cosmetic change now, even though it might improve clarity slightly, is not
worth the tradeoff in terms of creating an update barrier for existing
users.

In the end, while some changes were made, most APIs remained the same and
users of Spark <= 0.9 were pretty easily able to upgrade to 1.0. I think
this served the project very well, as compatibility means users are able to
upgrade and we keep as many people on the latest versions of Spark (though
maybe not the latest APIs of Spark) as possible.

As Spark grows, I think compatibility actually becomes more important and
we should be more conservative rather than less. Today, there are very
likely more Spark programs running than there were at any other time in the
past. Spark is no longer a tool only used by advanced hackers, it is now
also running "traditional enterprise workloads.'' In many cases these jobs
are powering important processes long after the original author leaves.

Broken APIs can also affect libraries that extend Spark. This dependency
can be even harder for users, as if the library has not been upgraded to
use new APIs and they need that library, they are stuck.

Given all of this, I'd like to propose the following rubric as an addition
to our semantic versioning policy. After discussion and if people agree
this is a good idea, I'll call a vote of the PMC to ratify its inclusion in
the official policy.

Considerations When Breaking APIs

The Spark project strives to avoid breaking APIs or silently changing
behavior, even at major versions. While this is not always possible, the
balance of the following factors should be considered before choosing to
break an API.

Cost of Breaking an API

Breaking an API almost always has a non-trivial cost to the users of Spark.
A broken API means that Spark programs need to be rewritten before they can
be upgraded. However, there are a few considerations when thinking about
what the cost will be:

   -

   Usage - an API that is actively used in many different places, is always
   very costly to break. While it is hard to know usage for sure, there are a
   bunch of ways that we can estimate:
   -

  How long has the API been in Spark?
  -

  Is the API common even for basic programs?
  -

  How often do we see recent questions in JIRA or mailing lists?
  -

  How often does it appear in StackOverflow or blogs?
  -

   Behavior after the break - How will a program that works today, work
   after the break? The following are listed roughly in order of increasing
   severity:
   -

  Will there be a compiler or linker error?
  -

  Will there be a runtime exception?
  -

  Will that exception happen after significant 

Re: [DISCUSSION] Esoteric Spark function `TRIM/LTRIM/RTRIM`

2020-02-21 Thread Michael Armbrust
This plan for evolving the TRIM function to be more standards compliant
sounds much better to me than the original change to just switch the order.
It pushes users in the right direction and cleans up our tech debt without
silently breaking existing workloads. It means that programs won't return
different results when run on Spark 2.x and Spark 3.x.

One caveat:

> If we keep this situation in 3.0.0 release (a major release), it means
>>> Apache Spark will be forever
>>>
>>
I think this ship has already sailed. There is nothing special about 3.0
here. If the API is in a released version of Spark, than the mistake is
already made.

Major releases are an opportunity to break APIs when we *have to*. We
always strive to avoid breaking APIs even if they have not been in an X.0
release.


[jira] [Commented] (SPARK-27911) PySpark Packages should automatically choose correct scala version

2019-07-16 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16886479#comment-16886479
 ] 

Michael Armbrust commented on SPARK-27911:
--

You are right, there is nothing pyspark specific about this. I just used 
pyspark as an example as I think they are more likely to use {{pip}} and thus 
never even see the scala version they are using.

It would be great if we could make this easier for all Spark users, preferably 
by automatically correcting mismatches.

> PySpark Packages should automatically choose correct scala version
> --
>
> Key: SPARK-27911
> URL: https://issues.apache.org/jira/browse/SPARK-27911
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Michael Armbrust
>Priority: Major
>
> Today, users of pyspark (and Scala) need to manually specify the version of 
> Scala that their Spark installation is using when adding a Spark package to 
> their application. This extra configuration is confusing to users who may not 
> even know which version of Scala they are using (for example, if they 
> installed Spark using {{pip}}). The confusion here is exacerbated by releases 
> in Spark that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.
> https://spark.apache.org/releases/spark-release-2-4-2.html
> https://spark.apache.org/releases/spark-release-2-4-3.html
> Since Spark can know which version of Scala it was compiled for, we should 
> give users the option to automatically choose the correct version.  This 
> could be as simple as a substitution for {{$scalaVersion}} or something when 
> resolving a package (similar to SBTs support for automatically handling scala 
> dependencies).
> Here are some concrete examples of users getting it wrong and getting 
> confused:
> https://github.com/delta-io/delta/issues/6
> https://github.com/delta-io/delta/issues/63



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Michael Armbrust
>
> Thanks for confirmation. We are using the workaround to create a separate
> Hive external table STORED AS PARQUET with the exact location of Delta
> table. Our use case is batch-driven and we are running VACUUM with 0
> retention after every batch is completed. Do you see any potential problem
> with this workaround, other than during the time when the batch is running
> the table can provide some wrong information?
>

This is a reasonable workaround to allow other systems to read Delta
tables. Another consideration is that if you are running on S3, eventual
consistency my increase the amount of time before external readers see a
consistent view. Also note, that this prevents you from using time travel.

In the near future, I think we should also support generating manifest
files that list the data files in the most recent version of the Delta
table (see #76  for details).
This will give support for Presto, though Hive would require some
additional modifications on the Hive side (if there are any Hive
contributors / committers on this list let me know!).

In the longer term, we are talking with authors of other engines to build
native support for reading the Delta transaction log (e.g. this
announcement from Starburst
).
Please contact me if you are interested in contributing here!


[jira] [Updated] (SPARK-27911) PySpark Packages should automatically choose correct scala version

2019-05-31 Thread Michael Armbrust (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-27911:
-
Description: 
Today, users of pyspark (and Scala) need to manually specify the version of 
Scala that their Spark installation is using when adding a Spark package to 
their application. This extra configuration is confusing to users who may not 
even know which version of Scala they are using (for example, if they installed 
Spark using {{pip}}). The confusion here is exacerbated by releases in Spark 
that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.

https://spark.apache.org/releases/spark-release-2-4-2.html
https://spark.apache.org/releases/spark-release-2-4-3.html

Since Spark can know which version of Scala it was compiled for, we should give 
users the option to automatically choose the correct version.  This could be as 
simple as a substitution for {{$scalaVersion}} or something when resolving a 
package (similar to SBTs support for automatically handling scala dependencies).

Here are some concrete examples of users getting it wrong and getting confused:
https://github.com/delta-io/delta/issues/6
https://github.com/delta-io/delta/issues/63

  was:
Today, users of pyspark (and Scala) need to manually specify the version of 
Scala that their Spark installation is using when adding a Spark package to 
their application. This extra configuration confusing to users who may not even 
know which version of Scala they are using (for example, if they installed 
Spark using {{pip}}). The confusion here is exacerbated by releases in Spark 
that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.

https://spark.apache.org/releases/spark-release-2-4-2.html
https://spark.apache.org/releases/spark-release-2-4-3.html

Since Spark can know which version of Scala it was compiled for, we should give 
users the option to automatically choose the correct version.  This could be as 
simple as a substitution for {{$scalaVersion}} or something when resolving a 
package (similar to SBTs support for automatically handling scala dependencies).

Here are some concrete examples of users getting it wrong and getting confused:
https://github.com/delta-io/delta/issues/6
https://github.com/delta-io/delta/issues/63


> PySpark Packages should automatically choose correct scala version
> --
>
> Key: SPARK-27911
> URL: https://issues.apache.org/jira/browse/SPARK-27911
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Michael Armbrust
>Priority: Major
>
> Today, users of pyspark (and Scala) need to manually specify the version of 
> Scala that their Spark installation is using when adding a Spark package to 
> their application. This extra configuration is confusing to users who may not 
> even know which version of Scala they are using (for example, if they 
> installed Spark using {{pip}}). The confusion here is exacerbated by releases 
> in Spark that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.
> https://spark.apache.org/releases/spark-release-2-4-2.html
> https://spark.apache.org/releases/spark-release-2-4-3.html
> Since Spark can know which version of Scala it was compiled for, we should 
> give users the option to automatically choose the correct version.  This 
> could be as simple as a substitution for {{$scalaVersion}} or something when 
> resolving a package (similar to SBTs support for automatically handling scala 
> dependencies).
> Here are some concrete examples of users getting it wrong and getting 
> confused:
> https://github.com/delta-io/delta/issues/6
> https://github.com/delta-io/delta/issues/63



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27911) PySpark Packages should automatically choose correct scala version

2019-05-31 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-27911:


 Summary: PySpark Packages should automatically choose correct 
scala version
 Key: SPARK-27911
 URL: https://issues.apache.org/jira/browse/SPARK-27911
 Project: Spark
  Issue Type: New Feature
  Components: PySpark
Affects Versions: 2.4.3
Reporter: Michael Armbrust


Today, users of pyspark (and Scala) need to manually specify the version of 
Scala that their Spark installation is using when adding a Spark package to 
their application. This extra configuration confusing to users who may not even 
know which version of Scala they are using (for example, if they installed 
Spark using {{pip}}). The confusion here is exacerbated by releases in Spark 
that have changed the default from {{2.11}} -> {{2.12}} -> {{2.11}}.

https://spark.apache.org/releases/spark-release-2-4-2.html
https://spark.apache.org/releases/spark-release-2-4-3.html

Since Spark can know which version of Scala it was compiled for, we should give 
users the option to automatically choose the correct version.  This could be as 
simple as a substitution for {{$scalaVersion}} or something when resolving a 
package (similar to SBTs support for automatically handling scala dependencies).

Here are some concrete examples of users getting it wrong and getting confused:
https://github.com/delta-io/delta/issues/6
https://github.com/delta-io/delta/issues/63



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27676) InMemoryFileIndex should hard-fail on missing files instead of logging and continuing

2019-05-10 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837552#comment-16837552
 ] 

Michael Armbrust commented on SPARK-27676:
--

I tend to agree that all cases where we chose to ignore missing files should be 
hidden behind the existing {{spark.sql.files.ignoreMissingFiles}} flag.

> InMemoryFileIndex should hard-fail on missing files instead of logging and 
> continuing
> -
>
> Key: SPARK-27676
> URL: https://issues.apache.org/jira/browse/SPARK-27676
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Josh Rosen
>Priority: Major
>
> Spark's {{InMemoryFileIndex}} contains two places where {{FileNotFound}} 
> exceptions are caught and logged as warnings (during [directory 
> listing|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274]
>  and [block location 
> lookup|https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333]).
>  I think that this is a dangerous default behavior and would prefer that 
> Spark hard-fails by default (with the ignore-and-continue behavior guarded by 
> a SQL session configuration).
> In SPARK-17599 and SPARK-24364, logic was added to ignore missing files. 
> Quoting from the PR for SPARK-17599:
> {quote}The {{ListingFileCatalog}} lists files given a set of resolved paths. 
> If a folder is deleted at any time between the paths were resolved and the 
> file catalog can check for the folder, the Spark job fails. This may abruptly 
> stop long running StructuredStreaming jobs for example.
> Folders may be deleted by users or automatically by retention policies. These 
> cases should not prevent jobs from successfully completing.
> {quote}
> Let's say that I'm *not* expecting to ever delete input files for my job. In 
> that case, this behavior can mask bugs.
> One straightforward masked bug class is accidental file deletion: if I'm 
> never expecting to delete files then I'd prefer to fail my job if Spark sees 
> deleted files.
> A more subtle bug can occur when using a S3 filesystem. Say I'm running a 
> Spark job against a partitioned Parquet dataset which is laid out like this:
> {code:java}
> data/
>   date=1/
> region=west/
>0.parquet
>1.parquet
> region=east/
>0.parquet
>1.parquet{code}
> If I do {{spark.read.parquet("/data/date=1/")}} then Spark needs to perform 
> multiple rounds of file listing, first listing {{/data/date=1}} to discover 
> the partitions for that date, then listing within each partition to discover 
> the leaf files. Due to the eventual consistency of S3 ListObjects, it's 
> possible that the first listing will show the {{region=west}} and 
> {{region=east}} partitions existing and then the next-level listing fails to 
> return any for some of the directories (e.g. {{/data/date=1/}} returns files 
> but {{/data/date=1/region=west/}} throws a {{FileNotFoundException}} in S3A 
> due to ListObjects inconsistency).
> If Spark propagated the {{FileNotFoundException}} and hard-failed in this 
> case then I'd be able to fail the job in this case where we _definitely_ know 
> that the S3 listing is inconsistent (failing here doesn't guard against _all_ 
> potential S3 list inconsistency issues (e.g. back-to-back listings which both 
> return a subset of the true set of objects), but I think it's still an 
> improvement to fail for the subset of cases that we _can_ detect even if 
> that's not a surefire failsafe against the more general problem).
> Finally, I'm unsure if the original patch will have the desired effect: if a 
> file is deleted once a Spark job expects to read it then that can cause 
> problems at multiple layers, both in the driver (multiple rounds of file 
> listing) and in executors (if the deletion occurs after the construction of 
> the catalog but before the scheduling of the read tasks); I think the 
> original patch only resolved the problem for the driver (unless I'm missing 
> similar executor-side code specific to the original streaming use-case).
> Given all of these reasons, I think that the "ignore potentially deleted 
> files during file index listing" behavior should be guarded behind a feature 
> flag which defaults to {{false}}, consistent with the existing 
> {{spark.files.ignoreMissingFiles}

Re: [VOTE] Release Apache Spark 2.4.2

2019-04-19 Thread Michael Armbrust
+1 (binding), we've test this and it LGTM.

On Thu, Apr 18, 2019 at 7:51 PM Wenchen Fan  wrote:

> Please vote on releasing the following candidate as Apache Spark version
> 2.4.2.
>
> The vote is open until April 23 PST and passes if a majority +1 PMC votes
> are cast, with
> a minimum of 3 +1 votes.
>
> [ ] +1 Release this package as Apache Spark 2.4.2
> [ ] -1 Do not release this package because ...
>
> To learn more about Apache Spark, please see http://spark.apache.org/
>
> The tag to be voted on is v2.4.2-rc1 (commit
> a44880ba74caab7a987128cb09c4bee41617770a):
> https://github.com/apache/spark/tree/v2.4.2-rc1
>
> The release files, including signatures, digests, etc. can be found at:
> https://dist.apache.org/repos/dist/dev/spark/v2.4.2-rc1-bin/
>
> Signatures used for Spark RCs can be found in this file:
> https://dist.apache.org/repos/dist/dev/spark/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1322/
>
> The documentation corresponding to this release can be found at:
> https://dist.apache.org/repos/dist/dev/spark/v2.4.2-rc1-docs/
>
> The list of bug fixes going into 2.4.1 can be found at the following URL:
> https://issues.apache.org/jira/projects/SPARK/versions/12344996
>
> FAQ
>
> =
> How can I help test this release?
> =
>
> If you are a Spark user, you can help us test this release by taking
> an existing Spark workload and running on this release candidate, then
> reporting any regressions.
>
> If you're working in PySpark you can set up a virtual env and install
> the current RC and see if anything important breaks, in the Java/Scala
> you can add the staging repository to your projects resolvers and test
> with the RC (make sure to clean up the artifact cache before/after so
> you don't end up building with a out of date RC going forward).
>
> ===
> What should happen to JIRA tickets still targeting 2.4.2?
> ===
>
> The current list of open tickets targeted at 2.4.2 can be found at:
> https://issues.apache.org/jira/projects/SPARK and search for "Target
> Version/s" = 2.4.2
>
> Committers should look at those and triage. Extremely important bug
> fixes, documentation, and API tweaks that impact compatibility should
> be worked on immediately. Everything else please retarget to an
> appropriate release.
>
> ==
> But my bug isn't fixed?
> ==
>
> In order to make timely releases, we will typically not hold the
> release unless the bug in question is a regression from the previous
> release. That being said, if there is something which is a regression
> that has not been correctly targeted please ping me or a committer to
> help target the issue.
>


Re: Spark 2.4.2

2019-04-16 Thread Michael Armbrust
Thanks Ryan. To me the "test" for putting things in a maintenance release
is really a trade-off between benefit and risk (along with some caveats,
like user facing surface should not grow). The benefits here are fairly
large (now it is possible to plug in partition aware data sources) and the
risk is very low (no change in behavior by default).

And bugs aren't usually fixed with a configuration flag to turn on the fix.


Agree, this should be on by default in master. That would just tip the risk
balance for me in a maintenance release.

On Tue, Apr 16, 2019 at 4:55 PM Ryan Blue  wrote:

> Spark has a lot of strange behaviors already that we don't fix in patch
> releases. And bugs aren't usually fixed with a configuration flag to turn
> on the fix.
>
> That said, I don't have a problem with this commit making it into a patch
> release. This is a small change and looks safe enough to me. I was just a
> little surprised since I was expecting a correctness issue if this is
> prompting a release. I'm definitely on the side of case-by-case judgments
> on what to allow in patch releases and this looks fine.
>
> On Tue, Apr 16, 2019 at 4:27 PM Michael Armbrust 
> wrote:
>
>> I would argue that its confusing enough to a user for options from
>> DataFrameWriter to be silently dropped when instantiating the data source
>> to consider this a bug.  They asked for partitioning to occur, and we are
>> doing nothing (not even telling them we can't).  I was certainly surprised
>> by this behavior.  Do you have a different proposal about how this should
>> be handled?
>>
>> On Tue, Apr 16, 2019 at 4:23 PM Ryan Blue  wrote:
>>
>>> Is this a bug fix? It looks like a new feature to me.
>>>
>>> On Tue, Apr 16, 2019 at 4:13 PM Michael Armbrust 
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I know we just released Spark 2.4.1, but in light of fixing SPARK-27453
>>>> <https://issues.apache.org/jira/browse/SPARK-27453> I was wondering if
>>>> it might make sense to follow up quickly with 2.4.2.  Without this fix its
>>>> very hard to build a datasource that correctly handles partitioning without
>>>> using unstable APIs.  There are also a few other fixes that have trickled
>>>> in since 2.4.1.
>>>>
>>>> If there are no objections, I'd like to start the process shortly.
>>>>
>>>> Michael
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Spark 2.4.2

2019-04-16 Thread Michael Armbrust
I would argue that its confusing enough to a user for options from
DataFrameWriter to be silently dropped when instantiating the data source
to consider this a bug.  They asked for partitioning to occur, and we are
doing nothing (not even telling them we can't).  I was certainly surprised
by this behavior.  Do you have a different proposal about how this should
be handled?

On Tue, Apr 16, 2019 at 4:23 PM Ryan Blue  wrote:

> Is this a bug fix? It looks like a new feature to me.
>
> On Tue, Apr 16, 2019 at 4:13 PM Michael Armbrust 
> wrote:
>
>> Hello All,
>>
>> I know we just released Spark 2.4.1, but in light of fixing SPARK-27453
>> <https://issues.apache.org/jira/browse/SPARK-27453> I was wondering if
>> it might make sense to follow up quickly with 2.4.2.  Without this fix its
>> very hard to build a datasource that correctly handles partitioning without
>> using unstable APIs.  There are also a few other fixes that have trickled
>> in since 2.4.1.
>>
>> If there are no objections, I'd like to start the process shortly.
>>
>> Michael
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Spark 2.4.2

2019-04-16 Thread Michael Armbrust
Hello All,

I know we just released Spark 2.4.1, but in light of fixing SPARK-27453
 I was wondering if it
might make sense to follow up quickly with 2.4.2.  Without this fix its
very hard to build a datasource that correctly handles partitioning without
using unstable APIs.  There are also a few other fixes that have trickled
in since 2.4.1.

If there are no objections, I'd like to start the process shortly.

Michael


[jira] [Assigned] (SPARK-27453) DataFrameWriter.partitionBy is Silently Dropped by DSV1

2019-04-12 Thread Michael Armbrust (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-27453:


Assignee: Liwen Sun

> DataFrameWriter.partitionBy is Silently Dropped by DSV1
> ---
>
> Key: SPARK-27453
> URL: https://issues.apache.org/jira/browse/SPARK-27453
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.4.1
>Reporter: Michael Armbrust
>Assignee: Liwen Sun
>Priority: Critical
>
> This is a long standing quirk of the interaction between {{DataFrameWriter}} 
> and {{CreatableRelationProvider}} (and the other forms of the DSV1 API).  
> Users can specify columns in {{partitionBy}} and our internal data sources 
> will use this information.  Unfortunately, for external systems, this data is 
> silently dropped with no feedback given to the user.
> In the long run, I think that DataSourceV2 is a better answer. However, I 
> don't think we should wait for that API to stabilize before offering some 
> kind of solution to developers of external data sources. I also do not think 
> we should break binary compatibility of this API, but I do think that  small 
> surgical fix could alleviate the issue.
> I would propose that we could propagate partitioning information (when 
> present) along with the other configuration options passed to the data source 
> in the {{String, String}} map.
> I think its very unlikely that there are both data sources that validate 
> extra options and users who are using (no-op) partitioning with them, but out 
> of an abundance of caution we should protect the behavior change behind a 
> {{legacy}} flag that can be turned off.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27453) DataFrameWriter.partitionBy is Silently Dropped by DSV1

2019-04-12 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-27453:


 Summary: DataFrameWriter.partitionBy is Silently Dropped by DSV1
 Key: SPARK-27453
 URL: https://issues.apache.org/jira/browse/SPARK-27453
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.1, 2.2.3, 2.1.3, 2.0.2, 1.6.3, 1.5.2, 1.4.1
Reporter: Michael Armbrust


This is a long standing quirk of the interaction between {{DataFrameWriter}} 
and {{CreatableRelationProvider}} (and the other forms of the DSV1 API).  Users 
can specify columns in {{partitionBy}} and our internal data sources will use 
this information.  Unfortunately, for external systems, this data is silently 
dropped with no feedback given to the user.

In the long run, I think that DataSourceV2 is a better answer. However, I don't 
think we should wait for that API to stabilize before offering some kind of 
solution to developers of external data sources. I also do not think we should 
break binary compatibility of this API, but I do think that  small surgical fix 
could alleviate the issue.

I would propose that we could propagate partitioning information (when present) 
along with the other configuration options passed to the data source in the 
{{String, String}} map.

I think its very unlikely that there are both data sources that validate extra 
options and users who are using (no-op) partitioning with them, but out of an 
abundance of caution we should protect the behavior change behind a {{legacy}} 
flag that can be turned off.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23831) Add org.apache.derby to IsolatedClientLoader

2018-11-13 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685629#comment-16685629
 ] 

Michael Armbrust commented on SPARK-23831:
--

Why was it reverted?

> Add org.apache.derby to IsolatedClientLoader
> 
>
> Key: SPARK-23831
> URL: https://issues.apache.org/jira/browse/SPARK-23831
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Yuming Wang
>Priority: Major
>
> Add org.apache.derby to IsolatedClientLoader,otherwise it may throw an 
> exception:
> {noformat}
> [info] Cause: java.sql.SQLException: Failed to start database 'metastore_db' 
> with class loader 
> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@2439ab23, see 
> the next exception for details.
> [info] at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> [info] at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> [info] at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
> [info] at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown 
> Source)
> [info] at org.apache.derby.impl.jdbc.EmbedConnection.(Unknown Source)
> [info] at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
> {noformat}
> How to reproduce:
> {noformat}
> sed 's/HiveExternalCatalogSuite/HiveExternalCatalog2Suite/g' 
> sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
>  > 
> sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalog2Suite.scala
> build/sbt -Phive "hive/test-only *.HiveExternalCatalogSuite 
> *.HiveExternalCatalog2Suite"
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Plan on Structured Streaming in next major/minor release?

2018-10-30 Thread Michael Armbrust
>
> Agree. Just curious, could you explain what do you mean by "negation"?
> Does it mean applying retraction on aggregated?
>

Yeah exactly.  Our current streaming aggregation assumes that the input is
in append-mode and multiple aggregations break this.


Re: Plan on Structured Streaming in next major/minor release?

2018-10-30 Thread Michael Armbrust
Thanks for bringing up some possible future directions for streaming. Here
are some thoughts:
 - I personally view all of the activity on Spark SQL also as activity on
Structured Streaming. The great thing about building streaming on catalyst
/ tungsten is that continued improvement to these components improves
streaming use cases as well.
 - I think the biggest on-going project is DataSourceV2, whose goal is to
provide a stable / performant API for streaming and batch data sources to
plug in.  I think connectivity to many different systems is one of the most
powerful aspects of Spark and right now there is no stable public API for
streaming. A lot of committer / PMC time is being spent here at the moment.
 - As you mention, 2.4.0 significantly improves the built in connectivity
for Kafka, giving us the ability to read exactly once from a topic being
written to transactional producers. I think projects to extend this
guarantee to the Kafka Sink and also to improve authentication with Kafka
are a great idea (and it seems like there is a lot of review activity on
the latter).

You bring up some other possible projects like session window support.
This is an interesting project, but as far as I can tell it still there is
still a lot of work that would need to be done before this feature could be
merged.  We'd need to understand how it works with update mode amongst
other things. Additionally, a 3000+ line patch is really time consuming to
review. This coupled with the fact that all the users that I have
interacted with need "session windows + some custom business logic"
(usually implemented with flatMapGroupsWithState), mean that I'm more
inclined to direct limited review bandwidth to incremental improvements in
that feature than to something large/new. This is not to say that this
feature isn't useful / shouldn't be merge, just a bit of explanation as to
why there might be less activity here than you would hope.

Similarly, multiple aggregations are an often requested feature.  However,
fundamentally, this is going to be a fairly large investment (I think we'd
need to combine the unsupported operation checker and the query planner and
also create a high performance (i.e. whole stage code-gened) aggregation
operator that understands negation).

Thanks again for starting the discussion, and looking forward to hearing
about what features are most requested!

On Tue, Oct 30, 2018 at 12:23 AM Jungtaek Lim  wrote:

> Adding more: again, it doesn't mean they're feasible to do. Just a kind of
> brainstorming.
>
> * SPARK-20568: Delete files after processing in structured streaming
>   * There hasn't been consensus regarding supporting this: there were
> voices for both YES and NO.
> * Support multiple levels of aggregations in structured streaming
>   * There're plenty of questions in SO regarding this. While I don't think
> it makes sense on structured streaming if it requires additional shuffle,
> there might be another case: group by keys, apply aggregation, apply
> aggregation on aggregated result (grouped keys don't change)
>
> 2018년 10월 22일 (월) 오후 12:25, Jungtaek Lim 님이 작성:
>
>> Yeah, the main intention of this thread is to collect interest on
>> possible feature list for structured streaming. From what I can see in
>> Spark community, most of the discussions as well as contributions are for
>> SQL, and I'd wish to see similar activeness / efforts on structured
>> streaming.
>> (Unfortunately there's less effort to review others' works - design doc
>> as well as pull request - most of efforts looks like being spent to their
>> own works.)
>>
>> I respect the role of PMC member, so the final decision would be up to
>> PMC members, but contributors as well as end users could show the interest
>> as well as discuss about requirements on SPIP, which could be a good
>> background to persuade PMC members.
>>
>> Before going into the deep I guess we could use this thread to discuss
>> about possible use cases, and if we would like to move forward to
>> individual thread we could initiate (or resurrect) its discussion thread.
>>
>> For queryable state, at least there seems no workaround in Spark to
>> provide similar thing, especially state is getting bigger. I may have some
>> concerns on the details, but I'll add my thought on the discussion thread.
>>
>> - Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 10월 22일 (월) 오전 1:15, Stavros Kontopoulos <
>> stavros.kontopou...@lightbend.com>님이 작성:
>>
>>> Hi Jungtaek,
>>>
>>> I just tried to start the discussion in the dev list along time ago.
>>> I enumerated some uses cases as Michael proposed here
>>> .
>>> The discussion didn't go further.
>>>
>>> If people find it useful we should start discussing it in detail again.
>>>
>>> Stavros
>>>
>>> On Sun, Oct 21, 2018 at 4:54 PM, Jungtaek Lim  wrote:
>>>
 Stavros, if my memory is right, you were 

[jira] [Commented] (SPARK-6459) Warn when Column API is constructing trivially true equality

2018-07-23 Thread Michael Armbrust (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-6459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553330#comment-16553330
 ] 

Michael Armbrust commented on SPARK-6459:
-

[~tenstriker] this will never happen from a SQL query.  This only happens when 
you take already resolved attributes from different parts of a DataFrame and 
manually construct an equality that can't be differentiated.

> Warn when Column API is constructing trivially true equality
> 
>
> Key: SPARK-6459
> URL: https://issues.apache.org/jira/browse/SPARK-6459
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Michael Armbrust
>    Assignee: Michael Armbrust
>Priority: Critical
> Fix For: 1.3.1, 1.4.0
>
>
> Right now its pretty confusing when a user constructs and equality predicate 
> that is going to be use in a self join, where the optimizer cannot 
> distinguish between the attributes in question (e.g.,  [SPARK-6231]).  Since 
> there is really no good reason to do this, lets print a warning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-11 Thread Michael Armbrust
Hmm yeah that does look wrong.  Would be great if someone opened a PR to
correct the docs :)

On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa 
wrote:

> The problem is solved.
> The actual schema of Kafka message is different from documentation.
>
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> The documentation says the format of "timestamp" column is Long type,
> but the actual format is timestamp.
>
>
> The followings are my code and result to check schema.
>
> -code
> val df = spark
>.read
>.format("kafka")
>.option("kafka.bootstrap.servers", bootstrapServers)
>.option(subscribeType, topics)
>.load()
>.printSchema()
>
> -result
> root
>   |-- key: binary (nullable = true)
>   |-- value: binary (nullable = true)
>   |-- topic: string (nullable = true)
>   |-- partition: integer (nullable = true)
>   |-- offset: long (nullable = true)
>   |-- timestamp: timestamp (nullable = true)
>   |-- timestampType: integer (nullable = true)
>
>
> Regards,
> Yuta
>
> On 2018/05/09 16:14, Yuta Morisawa wrote:
> > Hi All
> >
> > I'm trying to extract Kafka-timestamp from Kafka topics.
> >
> > The timestamp does not contain milli-seconds information,
> > but it should contain because ConsumerRecord class of Kafka 0.10
> > supports milli-second timestamp.
> >
> > How can I get milli-second timestamp from Kafka topics?
> >
> >
> > These are websites I refer to.
> >
> >
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
> >
> >
> >
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
> >
> >
> >
> > And this is my code.
> > 
> > val df = spark
> >.readStream
> >.format("kafka")
> >.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
> >.option("subscribe", "topic1,topic2")
> >.load()
> >.selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
> >.as[(Long, String)]
> > 
> >
> > Regards,
> > Yuta
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[jira] [Resolved] (SPARK-5517) Add input types for Java UDFs

2018-05-09 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-5517.
-
Resolution: Unresolved

> Add input types for Java UDFs
> -
>
> Key: SPARK-5517
> URL: https://issues.apache.org/jira/browse/SPARK-5517
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Michael Armbrust
>Priority: Critical
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18165) Kinesis support in Structured Streaming

2018-05-07 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466410#comment-16466410
 ] 

Michael Armbrust commented on SPARK-18165:
--

This is great!  I'm glad there are more connectors for Structured Streaming!

A few high-level thoughts:
 - The current Source/Sink APIs are internal/unstable.  We are working on 
building public/stable APIs as part of DataSourceV2. Would be great to get 
feedback on those APIs if this is ported to them
 - In general as the Spark project scales, we are trying to move more of the 
connectors out of the core project.  I'd suggest looking at contributing this 
to Apache Bahir and/or Spark Packages.

> Kinesis support in Structured Streaming
> ---
>
> Key: SPARK-18165
> URL: https://issues.apache.org/jira/browse/SPARK-18165
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Lauren Moos
>Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18165) Kinesis support in Structured Streaming

2018-05-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18165:
-
Component/s: (was: DStreams)
 Structured Streaming

> Kinesis support in Structured Streaming
> ---
>
> Key: SPARK-18165
> URL: https://issues.apache.org/jira/browse/SPARK-18165
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Lauren Moos
>Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Sorting on a streaming dataframe

2018-04-30 Thread Michael Armbrust
Please open a JIRA then!

On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> I see.
>
> monotonically_increasing_id on streaming dataFrames will be really helpful
> to me and I believe to many more users. Adding this functionality in Spark
> would be efficient in terms of performance as compared to implementing this
> functionality inside the applications.
>
> Hemant
>
> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> The basic tenet of structured streaming is that a query should return the
>> same answer in streaming or batch mode. We support sorting in complete mode
>> because we have all the data and can sort it correctly and return the full
>> answer.  In update or append mode, sorting would only return a correct
>> answer if we could promise that records that sort lower are going to arrive
>> later (and we can't).  Therefore, it is disallowed.
>>
>> If you are just looking for a unique, stable id and you are already using
>> kafka as the source, you could just combine the partition id and the
>> offset. The structured streaming connector to Kafka
>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
>> exposes both of these in the schema of the streaming DataFrame. (similarly
>> for kinesis you can use the shard id and sequence number)
>>
>> If you need the IDs to be contiguous, then this is a somewhat
>> fundamentally hard problem.  I think the best we could do is add support
>> for monotonically_increasing_id() in streaming dataframes.
>>
>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chaya...@gmail.com>
>> wrote:
>>
>>> Perhaps your use case fits to Apache Kafka better.
>>>
>>> More info at:
>>> https://kafka.apache.org/documentation/streams/
>>>
>>> Everything really comes down to the architecture design and algorithm
>>> spec. However, from my experience with Spark, there are many good reasons
>>> why this requirement is not supported ;)
>>>
>>> Best,
>>>
>>> Chayapan (A)
>>>
>>>
>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9...@gmail.com>
>>> wrote:
>>>
>>> Thanks Chris. There are many ways in which I can solve this problem but
>>> they are cumbersome. The easiest way would have been to sort the streaming
>>> dataframe. The reason I asked this question is because I could not find a
>>> reason why sorting on streaming dataframe is disallowed.
>>>
>>> Hemant
>>>
>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>>> chris.bow...@microfocus.com> wrote:
>>>
>>>> You can happily sort the underlying RDD of InternalRow(s) inside a
>>>> sink, assuming you are willing to implement and maintain your own sink(s).
>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of
>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient
>>>> and requires less working knowledge to make effective reuse of internals.
>>>> Just group by foo and then sort accordingly and assign ids. The id counter
>>>> can be stateful per group. Sometimes this problem may not need to be solved
>>>> at all. For example, if you are using kafka, a proper partitioning scheme
>>>> and message offsets may be “good enough”.
>>>> --
>>>> *From:* Hemant Bhanawat <hemant9...@gmail.com>
>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>>> *To:* Reynold Xin
>>>> *Cc:* dev
>>>> *Subject:* Re: Sorting on a streaming dataframe
>>>>
>>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>>> incoming records. For that, we are zipping the streaming rdds with that
>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>>> records in the streaming dataframe gets counters in random order but the
>>>> counter should always be incrementing.
>>>>
>>>> This is working fine until we have a failure. When we have a failure,
>>>> we re-assign the records to snapshot ids  and this time same snapshot id
>>>> can get assigned to a different record. This is a problem because the
>>>> primary key in our storage engine is <recordid, snapshotid>. So we want to
>>>> sort the dataframe so that the records always get the same snapshot id.
>>>>
>>>>
>>>>
>>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>>
>>>> Can you describe your use case more?
>>>>
>>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Guys,
>>>>
>>>> Why is sorting on streaming dataframes not supported(unless it is
>>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>>
>>>> Hemant
>>>>
>>>>
>>>>
>>>
>>>
>>
>


Re: Sorting on a streaming dataframe

2018-04-26 Thread Michael Armbrust
The basic tenet of structured streaming is that a query should return the
same answer in streaming or batch mode. We support sorting in complete mode
because we have all the data and can sort it correctly and return the full
answer.  In update or append mode, sorting would only return a correct
answer if we could promise that records that sort lower are going to arrive
later (and we can't).  Therefore, it is disallowed.

If you are just looking for a unique, stable id and you are already using
kafka as the source, you could just combine the partition id and the
offset. The structured streaming connector to Kafka

exposes both of these in the schema of the streaming DataFrame. (similarly
for kinesis you can use the shard id and sequence number)

If you need the IDs to be contiguous, then this is a somewhat fundamentally
hard problem.  I think the best we could do is add support
for monotonically_increasing_id() in streaming dataframes.

On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha 
wrote:

> Perhaps your use case fits to Apache Kafka better.
>
> More info at:
> https://kafka.apache.org/documentation/streams/
>
> Everything really comes down to the architecture design and algorithm
> spec. However, from my experience with Spark, there are many good reasons
> why this requirement is not supported ;)
>
> Best,
>
> Chayapan (A)
>
>
> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat  wrote:
>
> Thanks Chris. There are many ways in which I can solve this problem but
> they are cumbersome. The easiest way would have been to sort the streaming
> dataframe. The reason I asked this question is because I could not find a
> reason why sorting on streaming dataframe is disallowed.
>
> Hemant
>
> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
> chris.bow...@microfocus.com> wrote:
>
>> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
>> assuming you are willing to implement and maintain your own sink(s). That
>> is, just grabbing the parquet sink, etc. isn’t going to work out of the
>> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
>> requires less working knowledge to make effective reuse of internals. Just
>> group by foo and then sort accordingly and assign ids. The id counter can
>> be stateful per group. Sometimes this problem may not need to be solved at
>> all. For example, if you are using kafka, a proper partitioning scheme and
>> message offsets may be “good enough”.
>> --
>> *From:* Hemant Bhanawat 
>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>> *To:* Reynold Xin
>> *Cc:* dev
>> *Subject:* Re: Sorting on a streaming dataframe
>>
>> Well, we want to assign snapshot ids (incrementing counters) to the
>> incoming records. For that, we are zipping the streaming rdds with that
>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>> records in the streaming dataframe gets counters in random order but the
>> counter should always be incrementing.
>>
>> This is working fine until we have a failure. When we have a failure, we
>> re-assign the records to snapshot ids  and this time same snapshot id can
>> get assigned to a different record. This is a problem because the primary
>> key in our storage engine is . So we want to sort the
>> dataframe so that the records always get the same snapshot id.
>>
>>
>>
>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin 
>> wrote:
>>
>> Can you describe your use case more?
>>
>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat 
>> wrote:
>>
>> Hi Guys,
>>
>> Why is sorting on streaming dataframes not supported(unless it is
>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>
>> Hemant
>>
>>
>>
>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Michael Armbrust
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp",
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:

> Hi Arun,
>
> I want to select the entire row with the max timestamp for each group. I
> have modified my data set below to avoid any confusion.
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 1  |  6 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount | my_timestamp
> ---
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:
>
>> Cant the “max” function used here ? Something like..
>>
>> stream.groupBy($"id").max("amount").writeStream.outputMode(“
>> complete”/“update")….
>>
>> Unless the “stream” is already a grouped stream, in which case the above
>> would not work since the support for multiple aggregate operations is not
>> there yet.
>>
>> Thanks,
>> Arun
>>
>> From: kant kodali 
>> Date: Tuesday, April 17, 2018 at 11:41 AM
>> To: Tathagata Das 
>> Cc: "user @spark" 
>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>
>> Hi TD,
>>
>> Thanks for that. The only reason I ask is I don't see any alternative
>> solution to solve the problem below using raw sql.
>>
>>
>> How to select the max row for every group in spark structured streaming
>> 2.3.0 without using order by since it requires complete mode or
>> mapGroupWithState?
>>
>> *Input:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  |  5 |  2018-04-01T01:00:00.000Z
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 20 |  2018-04-01T01:20:00.000Z
>> 2  | 30 |  2018-04-01T01:25:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> *Expected Output:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> Looking for a streaming solution using either raw sql like 
>> sparkSession.sql("sql
>> query") or similar to raw sql but not something like mapGroupWithState
>>
>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>> function. That does not fit in with the SQL language structure.
>>>
>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
>>>
 Hi All,

 can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

 Thanks!



>>>
>>
>


[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-04-10 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16433222#comment-16433222
 ] 

Michael Armbrust commented on SPARK-23337:
--

The checkpoint will only grow if you are doing an aggregation, otherwise the 
watermark will not affect computation.

You can set a watermark on the nested column, you just need to project it to a 
top level column using {{withColumn}}

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.ap

[jira] [Commented] (SPARK-23835) When Dataset.as converts column from nullable to non-nullable type, null Doubles are converted silently to -1

2018-04-02 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423046#comment-16423046
 ] 

Michael Armbrust commented on SPARK-23835:
--

I believe the correct semantics are to throw a {{NullPointerException}} if the 
users tries to deserialize a null value into a type that doesn't support nulls. 
 See [this test case for an 
example|https://github.com/apache/spark/blob/b2edc30db1dcc6102687d20c158a2700965fdf51/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L1428-L1443].
 We must just be missing the assertion in this case.

> When Dataset.as converts column from nullable to non-nullable type, null 
> Doubles are converted silently to -1
> -
>
> Key: SPARK-23835
> URL: https://issues.apache.org/jira/browse/SPARK-23835
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> I constructed a DataFrame with a nullable java.lang.Double column (and an 
> extra Double column).  I then converted it to a Dataset using ```as[(Double, 
> Double)]```.  When the Dataset is shown, it has a null.  When it is collected 
> and printed, the null is silently converted to a -1.
> Code snippet to reproduce this:
> {code}
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(java.lang.Double, Double)](
>   (1.0, 2.0),
>   (3.0, 4.0),
>   (Double.NaN, 5.0),
>   (null, 6.0)
> ).toDF("a", "b")
> df.show()  // OUTPUT 1: has null
> df.printSchema()
> val data = df.as[(Double, Double)]
> data.show()  // OUTPUT 2: has null
> data.collect().foreach(println)  // OUTPUT 3: has -1
> {code}
> OUTPUT 1 and 2:
> {code}
> ++---+
> |   a|  b|
> ++---+
> | 1.0|2.0|
> | 3.0|4.0|
> | NaN|5.0|
> |null|6.0|
> ++---+
> {code}
> OUTPUT 3:
> {code}
> (1.0,2.0)
> (3.0,4.0)
> (NaN,5.0)
> (-1.0,6.0)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23835) When Dataset.as converts column from nullable to non-nullable type, null Doubles are converted silently to -1

2018-03-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420809#comment-16420809
 ] 

Michael Armbrust commented on SPARK-23835:
--

/cc [~cloud_fan]

> When Dataset.as converts column from nullable to non-nullable type, null 
> Doubles are converted silently to -1
> -
>
> Key: SPARK-23835
> URL: https://issues.apache.org/jira/browse/SPARK-23835
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> I constructed a DataFrame with a nullable java.lang.Double column (and an 
> extra Double column).  I then converted it to a Dataset using ```as[(Double, 
> Double)]```.  When the Dataset is shown, it has a null.  When it is collected 
> and printed, the null is silently converted to a -1.
> Code snippet to reproduce this:
> {code}
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(java.lang.Double, Double)](
>   (1.0, 2.0),
>   (3.0, 4.0),
>   (Double.NaN, 5.0),
>   (null, 6.0)
> ).toDF("a", "b")
> df.show()  // OUTPUT 1: has null
> df.printSchema()
> val data = df.as[(Double, Double)]
> data.show()  // OUTPUT 2: has null
> data.collect().foreach(println)  // OUTPUT 3: has -1
> {code}
> OUTPUT 1 and 2:
> {code}
> ++---+
> |   a|  b|
> ++---+
> | 1.0|2.0|
> | 3.0|4.0|
> | NaN|5.0|
> |null|6.0|
> ++---+
> {code}
> OUTPUT 3:
> {code}
> (1.0,2.0)
> (3.0,4.0)
> (NaN,5.0)
> (-1.0,6.0)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

2018-03-20 Thread Michael Armbrust
Those options will not affect structured streaming.  You are looking for

.option("maxOffsetsPerTrigger", "1000")

We are working on improving this by building a generic mechanism into the
Streaming DataSource V2 so that the engine can do admission control on the
amount of data returned in a source independent way.

On Tue, Mar 20, 2018 at 2:58 PM, kant kodali  wrote:

> I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured streaming
> using Direct API's although I am not sure? If it is direct API's the only
> parameters that are relevant are below according to this
> 
> article
>
>- spark.conf("spark.streaming.backpressure.enabled", "true")
>- spark.conf("spark.streaming.kafka.maxRatePerPartition", "1")
>
> I set both of these and I run select count * on my 10M records I still
> don't see any output until it finishes the initial batch of 10M and this
> takes a while. so I am wondering if I miss something here?
>
> On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen 
> wrote:
>
>> The following
>>  
>> settings
>> may be what you’re looking for:
>>
>>- spark.streaming.backpressure.enabled
>>- spark.streaming.backpressure.initialRate
>>- spark.streaming.receiver.maxRate
>>- spark.streaming.kafka.maxRatePerPartition
>>
>> ​
>>
>> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali  wrote:
>>
>>> Yes it indeed makes sense! Is there a way to get incremental counts when
>>> I start from 0 and go through 10M records? perhaps count for every micro
>>> batch or something?
>>>
>>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <
>>> ge...@ibleducation.com> wrote:
>>>
 Trigger does not mean report the current solution every 'trigger
 seconds'. It means it will attempt to fetch new data and process it no
 faster than trigger seconds intervals.

 If you're reading from the beginning and you've got 10M entries in
 kafka, it's likely pulling everything down then processing it completely
 and giving you an initial output. From here on out, it will check kafka
 every 1 second for new data and process it, showing you only the updated
 rows. So the initial read will give you the entire output since there is
 nothing to be 'updating' from. If you add data to kafka now that the
 streaming job has completed it's first batch (and leave it running), it
 will then show you the new/updated rows since the last batch every 1 second
 (assuming it can fetch + process in that time span).

 If the combined fetch + processing time is > the trigger time, you will
 notice warnings that it is 'falling behind' (I forget the exact verbiage,
 but something to the effect of the calculation took XX time and is falling
 behind). In that case, it will immediately check kafka for new messages and
 begin processing the next batch (if new messages exist).

 Hope that makes sense -


 On Mon, Mar 19, 2018 at 13:36 kant kodali  wrote:

> Hi All,
>
> I have 10 million records in my Kafka and I am just trying to
> spark.sql(select count(*) from kafka_view). I am reading from kafka and
> writing to kafka.
>
> My writeStream is set to "update" mode and trigger interval of one
> second (Trigger.ProcessingTime(1000)). I expect the counts to be
> printed every second but looks like it would print after going through all
> 10M. why?
>
> Also, it seems to take forever whereas Linux wc of 10M rows would take
> 30 seconds.
>
> Thanks!
>

>>>
>>
>


[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.

2018-03-08 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391861#comment-16391861
 ] 

Michael Armbrust commented on SPARK-23325:
--

It does seem like it would be that hard to stabilize at least the generic form 
of InternalRow or am I missing something?

> DataSourceV2 readers should always produce InternalRow.
> ---
>
> Key: SPARK-23325
> URL: https://issues.apache.org/jira/browse/SPARK-23325
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ryan Blue
>Priority: Major
>
> DataSourceV2 row-oriented implementations are limited to producing either 
> {{Row}} instances or {{UnsafeRow}} instances by implementing 
> {{SupportsScanUnsafeRow}}. Instead, I think that implementations should 
> always produce {{InternalRow}}.
> The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither 
> one is appropriate for implementers.
> File formats don't produce {{Row}} instances or the data values used by 
> {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation 
> that uses {{Row}} instances must produce data that is immediately translated 
> from the representation that was just produced by Spark. In my experience, it 
> made little sense to translate a timestamp in microseconds to a 
> (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass 
> that instance to Spark for immediate translation back.
> On the other hand, {{UnsafeRow}} is very difficult to produce unless data is 
> already held in memory. Even the Parquet support built into Spark 
> deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce 
> unsafe rows. When I went to build an implementation that deserializes Parquet 
> or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be 
> done without first deserializing into memory because the size of an array 
> must be known before any values are written.
> I ended up deciding to deserialize to {{InternalRow}} and use 
> {{UnsafeProjection}} to convert to unsafe. There are two problems with this: 
> first, this is Scala and was difficult to call from Java (it required 
> reflection), and second, this causes double projection in the physical plan 
> (a copy for unsafe to unsafe) if there is a projection that wasn't fully 
> pushed to the data source.
> I think the solution is to have a single interface for readers that expects 
> {{InternalRow}}. Then, a projection should be added in the Spark plan to 
> convert to unsafe and avoid projection in the plan and in the data source. If 
> the data source already produces unsafe rows by deserializing directly, this 
> still minimizes the number of copies because the unsafe projection will check 
> whether the incoming data is already {{UnsafeRow}}.
> Using {{InternalRow}} would also match the interface on the write side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Spark 2.3.0 (RC5)

2018-02-26 Thread Michael Armbrust
+1 all our pipelines have been running the RC for several days now.

On Mon, Feb 26, 2018 at 10:33 AM, Dongjoon Hyun 
wrote:

> +1 (non-binding).
>
> Bests,
> Dongjoon.
>
>
>
> On Mon, Feb 26, 2018 at 9:14 AM, Ryan Blue 
> wrote:
>
>> +1 (non-binding)
>>
>> On Sat, Feb 24, 2018 at 4:17 PM, Xiao Li  wrote:
>>
>>> +1 (binding) in Spark SQL, Core and PySpark.
>>>
>>> Xiao
>>>
>>> 2018-02-24 14:49 GMT-08:00 Ricardo Almeida >> >:
>>>
 +1 (non-binding)

 same as previous RC

 On 24 February 2018 at 11:10, Hyukjin Kwon  wrote:

> +1
>
> 2018-02-24 16:57 GMT+09:00 Bryan Cutler :
>
>> +1
>> Tests passed and additionally ran Arrow related tests and did some
>> perf checks with python 2.7.14
>>
>> On Fri, Feb 23, 2018 at 6:18 PM, Holden Karau 
>> wrote:
>>
>>> Note: given the state of Jenkins I'd love to see Bryan Cutler or
>>> someone with Arrow experience sign off on this release.
>>>
>>> On Fri, Feb 23, 2018 at 6:13 PM, Cheng Lian 
>>> wrote:
>>>
 +1 (binding)

 Passed all the tests, looks good.

 Cheng

 On 2/23/18 15:00, Holden Karau wrote:

 +1 (binding)
 PySpark artifacts install in a fresh Py3 virtual env

 On Feb 23, 2018 7:55 AM, "Denny Lee"  wrote:

> +1 (non-binding)
>
> On Fri, Feb 23, 2018 at 07:08 Josh Goldsborough <
> joshgoldsboroughs...@gmail.com> wrote:
>
>> New to testing out Spark RCs for the community but I was able to
>> run some of the basic unit tests without error so for what it's 
>> worth, I'm
>> a +1.
>>
>> On Thu, Feb 22, 2018 at 4:23 PM, Sameer Agarwal <
>> samee...@apache.org> wrote:
>>
>>> Please vote on releasing the following candidate as Apache Spark
>>> version 2.3.0. The vote is open until Tuesday February 27, 2018 at 
>>> 8:00:00
>>> am UTC and passes if a majority of at least 3 PMC +1 votes are cast.
>>>
>>>
>>> [ ] +1 Release this package as Apache Spark 2.3.0
>>>
>>> [ ] -1 Do not release this package because ...
>>>
>>>
>>> To learn more about Apache Spark, please see
>>> https://spark.apache.org/
>>>
>>> The tag to be voted on is v2.3.0-rc5:
>>> https://github.com/apache/spark/tree/v2.3.0-rc5
>>> (992447fb30ee9ebb3cf794f2d06f4d63a2d792db)
>>>
>>> List of JIRA tickets resolved in this release can be found here:
>>> https://issues.apache.org/jira/projects/SPARK/versions/12339551
>>>
>>> The release files, including signatures, digests, etc. can be
>>> found at:
>>> https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-bin/
>>>
>>> Release artifacts are signed with the following key:
>>> https://dist.apache.org/repos/dist/dev/spark/KEYS
>>>
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapache
>>> spark-1266/
>>>
>>> The documentation corresponding to this release can be found at:
>>> https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs
>>> /_site/index.html
>>>
>>>
>>> FAQ
>>>
>>> ===
>>> What are the unresolved issues targeted for 2.3.0?
>>> ===
>>>
>>> Please see https://s.apache.org/oXKi. At the time of writing,
>>> there are currently no known release blockers.
>>>
>>> =
>>> How can I help test this release?
>>> =
>>>
>>> If you are a Spark user, you can help us test this release by
>>> taking an existing Spark workload and running on this release 
>>> candidate,
>>> then reporting any regressions.
>>>
>>> If you're working in PySpark you can set up a virtual env and
>>> install the current RC and see if anything important breaks, in the
>>> Java/Scala you can add the staging repository to your projects 
>>> resolvers
>>> and test with the RC (make sure to clean up the artifact cache 
>>> before/after
>>> so you don't end up building with a out of date RC going forward).
>>>
>>> ===
>>> What should happen to JIRA tickets still targeting 2.3.0?
>>> ===
>>>

[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-22 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373611#comment-16373611
 ] 

Michael Armbrust commented on SPARK-18057:
--

My only concern is that it is stable and backwards compatible.  I'm fine with 
skipping / waiting.

Regarding the timeline, we can put this in master whenever, but we typically 
don't change dependencies in point releases so this will need to be targeted at 
Spark 2.4.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-21 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372138#comment-16372138
 ] 

Michael Armbrust commented on SPARK-18057:
--

We generally tend towards "don't break things that are working for people" 
rather than "clean".  See the RDD API for an example :).

I'm increasingly pro just keeping the name and upgrading the client.  If they 
ever break compatibility again we can have yet another artifact name, but I 
hope it doesn't come to that.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Spark 2.3.0 (RC4)

2018-02-21 Thread Michael Armbrust
I'm -1 on any changes that aren't fixing major regressions from 2.2 at this
point. Also in any cases where its possible we should be flipping new
features off if they are still regressing, rather than continuing to
attempt to fix them.

Since its experimental, I would support backporting the DataSourceV2
patches into 2.3.1 so that there is more opportunity for feedback as the
API matures.

On Wed, Feb 21, 2018 at 11:32 AM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> FYI. I found two more blockers:
>
> https://issues.apache.org/jira/browse/SPARK-23475
> https://issues.apache.org/jira/browse/SPARK-23481
>
> On Wed, Feb 21, 2018 at 9:45 AM, Xiao Li  wrote:
>
>> Hi, Ryan,
>>
>> In this release, Data Source V2 is experimental. We are still collecting
>> the feedbacks from the community and will improve the related APIs and
>> implementation in the next 2.4 release.
>>
>> Thanks,
>>
>> Xiao
>>
>> 2018-02-21 9:43 GMT-08:00 Xiao Li :
>>
>>> Hi, Justin,
>>>
>>> Based on my understanding, SPARK-17147 is also not a regression. Thus,
>>> Spark 2.3.0 is unable to contain it. We have to wait for the committers who
>>> are familiar with Spark Streaming to make a decision whether we can fix the
>>> issue in Spark 2.3.1.
>>>
>>> Since this is open source, feel free to add the patch in your local
>>> build.
>>>
>>> Thanks for using Spark!
>>>
>>> Xiao
>>>
>>>
>>> 2018-02-21 9:36 GMT-08:00 Ryan Blue :
>>>
 No problem if we can't add them, this is experimental anyway so this
 release should be more about validating the API and the start of our
 implementation. I just don't think we can recommend that anyone actually
 use DataSourceV2 without these patches.

 On Wed, Feb 21, 2018 at 9:21 AM, Wenchen Fan 
 wrote:

> SPARK-23323 adds a new API, I'm not sure we can still do it at this
> stage of the release... Besides users can work around it by calling the
> spark output coordinator themselves in their data source.
>
> SPARK-23203 is non-trivial and didn't fix any known bugs, so it's hard
> to convince other people that it's safe to add it to the release during 
> the
> RC phase.
>
> SPARK-23418 depends on the above one.
>
> Generally they are good to have in Spark 2.3, if they were merged
> before the RC. I think this is a lesson we should learn from, that we
> should work on stuff we want in the release before the RC, instead of 
> after.
>
> On Thu, Feb 22, 2018 at 1:01 AM, Ryan Blue 
> wrote:
>
>> What does everyone think about getting some of the newer DataSourceV2
>> improvements in? It should be low risk because it is a new code path, and
>> v2 isn't very usable without things like support for using the output
>> commit coordinator to deconflict writes.
>>
>> The ones I'd like to get in are:
>> * Use the output commit coordinator: https://issues.ap
>> ache.org/jira/browse/SPARK-23323
>> * Use immutable trees and the same push-down logic as other read
>> paths: https://issues.apache.org/jira/browse/SPARK-23203
>> * Don't allow users to supply schemas when they aren't supported:
>> https://issues.apache.org/jira/browse/SPARK-23418
>>
>> I think it would make the 2.3.0 release more usable for anyone
>> interested in the v2 read and write paths.
>>
>> Thanks!
>>
>> On Tue, Feb 20, 2018 at 7:07 PM, Weichen Xu <
>> weichen...@databricks.com> wrote:
>>
>>> +1
>>>
>>> On Wed, Feb 21, 2018 at 10:07 AM, Marcelo Vanzin <
>>> van...@cloudera.com> wrote:
>>>
 Done, thanks!

 On Tue, Feb 20, 2018 at 6:05 PM, Sameer Agarwal <
 samee...@apache.org> wrote:
 > Sure, please feel free to backport.
 >
 > On 20 February 2018 at 18:02, Marcelo Vanzin 
 wrote:
 >>
 >> Hey Sameer,
 >>
 >> Mind including https://github.com/apache/spark/pull/20643
 >> (SPARK-23468)  in the new RC? It's a minor bug since I've only
 hit it
 >> with older shuffle services, but it's pretty safe.
 >>
 >> On Tue, Feb 20, 2018 at 5:58 PM, Sameer Agarwal <
 samee...@apache.org>
 >> wrote:
 >> > This RC has failed due to
 >> > https://issues.apache.org/jira/browse/SPARK-23470.
 >> > Now that the fix has been merged in 2.3 (thanks Marcelo!),
 I'll follow
 >> > up
 >> > with an RC5 soon.
 >> >
 >> > On 20 February 2018 at 16:49, Ryan Blue 
 wrote:
 >> >>
 >> >> +1
 >> >>
 >> >> Build & tests look fine, checked signature and checksums for
 src
 >> >> tarball.
 >> >>
 >> >> On Tue, Feb 20, 2018 at 12:54 PM, 

[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-20 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370780#comment-16370780
 ] 

Michael Armbrust commented on SPARK-18057:
--

+1 to upgrading and it would also be great to add support for any new features 
(i.e. starting a query based on the time index rather than a specific offset).

I personally don't think that fixing KAFKA-4897 is mandatory, but keeping our 
stress tests running without hanging or losing coverage is.

Regarding naming, I'd probably just stop changing the name and say that 
"kafka-0-10-sql" works with any broker that is 0.10.0+.  We could also get rid 
of it, but that seems like an unnecessary change to me that just causes 
unnecessary pain to existing users.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-02-16 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367933#comment-16367933
 ] 

Michael Armbrust commented on SPARK-23337:
--

This is essentially the same issue as SPARK-18084. We are taking a column name 
here, not an expression.  As such you can only reference top level columns.  I 
agree this is an annoying aspect of the API, but changing it might have to 
happen at a major release since it would be change in behavior.

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)

[jira] [Updated] (SPARK-23173) from_json can produce nulls for fields which are marked as non-nullable

2018-02-15 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-23173:
-
Labels: release-notes  (was: )

> from_json can produce nulls for fields which are marked as non-nullable
> ---
>
> Key: SPARK-23173
> URL: https://issues.apache.org/jira/browse/SPARK-23173
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Herman van Hovell
>Priority: Major
>  Labels: release-notes
>
> The {{from_json}} function uses a schema to convert a string into a Spark SQL 
> struct. This schema can contain non-nullable fields. The underlying 
> {{JsonToStructs}} expression does not check if a resulting struct respects 
> the nullability of the schema. This leads to very weird problems in consuming 
> expressions. In our case parquet writing would produce an illegal parquet 
> file.
> There are roughly solutions here:
>  # Assume that each field in schema passed to {{from_json}} is nullable, and 
> ignore the nullability information set in the passed schema.
>  # Validate the object during runtime, and fail execution if the data is null 
> where we are not expecting this.
> I currently am slightly in favor of option 1, since this is the more 
> performant option and a lot easier to do.
> WDYT? cc [~rxin] [~marmbrus] [~hyukjin.kwon] [~brkyvz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [Structured Streaming] Deserializing avro messages from kafka source using schema registry

2018-02-09 Thread Michael Armbrust
This isn't supported yet, but there is on going work at spark-avro
 to enable this use
case.  Stay tuned.

On Fri, Feb 9, 2018 at 3:07 PM, Bram  wrote:

> Hi,
>
> I couldn't find any documentation about avro message deserialization using
> pyspark structured streaming. My aim is using confluent schema registry to
> get per topic schema then parse the avro messages with it.
>
> I found one but it was using DirectStream approach
> https://stackoverflow.com/questions/30339636/spark-
> python-avro-kafka-deserialiser
>
> Can anyone show me how?
>
> Thanks
>
> Regards,
>
> Abraham
>


Re: [Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Michael Armbrust
We didn't go this way initially because it doesn't work on storage systems
that have weaker guarantees than HDFS with respect to rename.  That said,
I'm happy to look at other options if we want to make this configurable.



On Fri, Feb 9, 2018 at 2:53 PM, Dave Cameron 
wrote:

> Hi
>
>
> I have a Spark structured streaming job that reads from Kafka and writes
> parquet files to Hive/HDFS. The files are not very large, but the Kafka
> source is noisy so each spark job takes a long time to complete. There is a
> significant window during which the parquet files are incomplete and other
> tools, including PrestoDB, encounter errors while trying to read them.
>
> I wrote this list and stackoverflow about the problem last summer:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tt29043.
> html
> https://stackoverflow.com/questions/47337030/not-a-
> parquet-file-too-small-from-presto-during-spark-structured-streaming-r/
> 47339805#47339805
>
> After hesitating for a while, I wrote a custom commit protocol to solve
> the problem. It combines HadoopMapReduceCommitProtocol's behavior of
> writing to a temp file first, with ManifestFileCommitProtocol. From what
> I can tell ManifestFileCommitProtocol is required for the normal Structured
> Streaming behavior of being able to resume streams from a known point.
>
> I think this commit protocol could be generally useful. Writing to a temp
> file and moving it to the final location is low cost on HDFS and is the
> standard behavior for non-streaming jobs, as implemented in
> HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol
> is needed for structured streaming jobs. We have been running this for a
> few months in production without problems.
>
> Here is the code (at the moment not up to Spark standards, admittedly):
> https://github.com/davcamer/spark/commit/361f1c69851f0f94cfd974ce720c69
> 4407f9340b
>
> Did I miss a better approach? Does anyone else think this would be useful?
>
> Thanks for reading,
> Dave
>
>
>


Re: DataSourceV2: support for named tables

2018-02-02 Thread Michael Armbrust
I am definitely in favor of first-class / consistent support for tables and
data sources.

One thing that is not clear to me from this proposal is exactly what the
interfaces are between:
 - Spark
 - A (The?) metastore
 - A data source

If we pass in the table identifier is the data source then responsible for
talking directly to the metastore? Is that what we want? (I'm not sure)

On Fri, Feb 2, 2018 at 10:39 AM, Ryan Blue 
wrote:

> There are two main ways to load tables in Spark: by name (db.table) and by
> a path. Unfortunately, the integration for DataSourceV2 has no support for
> identifying tables by name.
>
> I propose supporting the use of TableIdentifier, which is the standard
> way to pass around table names.
>
> The reason I think we should do this is to easily support more ways of
> working with DataSourceV2 tables. SQL statements and parts of the
> DataFrameReader and DataFrameWriter APIs that use table names create
> UnresolvedRelation instances that wrap an unresolved TableIdentifier.
>
> By adding support for passing TableIdentifier to a DataSourceV2Relation,
> then about all we need to enable these code paths is to add a resolution
> rule. For that rule, we could easily identify a default data source that
> handles named tables.
>
> This is what we’re doing in our Spark build, and we have DataSourceV2
> tables working great through SQL. (Part of this depends on the logical plan
> changes from my previous email to ensure inserts are properly resolved.)
>
> In the long term, I think we should update how we parse tables so that
> TableIdentifier can contain a source in addition to a database/context
> and a table name. That would allow us to integration new sources fairly
> seamlessly, without needing to a rather redundant SQL create statement like
> this:
>
> CREATE TABLE database.name USING source OPTIONS (table 'database.name')
>
> Also, I think we should pass TableIdentifier to DataSourceV2Relation,
> rather than going with Wenchen’s suggestion that we pass the table name as
> a string property, “table”. My rationale is that the new API shouldn’t leak
> its internal details to other parts of the planner.
>
> If we were to convert TableIdentifer to a “table” property wherever
> DataSourceV2Relation is created, we create several places that need to be
> in sync with the same convention. On the other hand, passing
> TableIdentifier to DataSourceV2Relation and relying on the relation to
> correctly set the options passed to readers and writers minimizes the
> number of places that conversion needs to happen.
>
> rb
> ​
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-02 Thread Michael Armbrust
>
> So here are my recommendations for moving forward, with DataSourceV2 as a
> starting point:
>
>1. Use well-defined logical plan nodes for all high-level operations:
>insert, create, CTAS, overwrite table, etc.
>2. Use rules that match on these high-level plan nodes, so that it
>isn’t necessary to create rules to match each eventual code path
>individually
>3. Define Spark’s behavior for these logical plan nodes. Physical
>nodes should implement that behavior, but all CREATE TABLE OVERWRITE should
>(eventually) make the same guarantees.
>4. Specialize implementation when creating a physical plan, not
>logical plans.
>
> I realize this is really long, but I’d like to hear thoughts about this.
> I’m sure I’ve left out some additional context, but I think the main idea
> here is solid: lets standardize logical plans for more consistent behavior
> and easier maintenance.
>
Context aside, I really like these rules! I think having query planning be
the boundary for specialization makes a lot of sense.

(RunnableCommand might also be my fault though sorry! :P)


Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-01-31 Thread Michael Armbrust
At this point I recommend that new applications are built using structured
streaming. The engine was GA-ed as of Spark 2.2 and I know of several very
large (trillions of records) production jobs that are running in Structured
Streaming.  All of our production pipelines at databricks are written using
structured streaming as well.

Regarding the comparison with RDDs: The situation here is different than
when thinking about batch DataFrames vs. RDDs.  DataFrames are "just" a
higher-level abstraction on RDDs.  Structured streaming is a completely new
implementation that does not use DStreams at all, but instead directly runs
jobs using RDDs.  The advantages over DStreams include:
 - The ability to start and stop individual queries (rather than needing to
start/stop a separate StreamingContext)
 - The ability to upgrade your stream and still start from an existing
checkpoint
 - Support for working with Spark SQL data sources (json, parquet, etc)
 - Higher level APIs (DataFrames and SQL) and lambda functions (Datasets)
 - Support for event time aggregation

At this point, with the addition of mapGroupsWithState and
flatMapGroupsWithState, I think we should be at feature parity with
DStreams (and the state store does incremental checkpoints that are more
efficient than the DStream store).  However if there are applications you
are having a hard time porting over, please let us know!

On Wed, Jan 31, 2018 at 5:42 AM, vijay.bvp  wrote:

> here is my two cents, experts please correct me if wrong
>
> its important to understand why one over other and for what kind of use
> case. There might be sometime in future where low level API's are
> abstracted
> and become legacy but for now in Spark RDD API is the core and low level
> API, all higher APIs translate to RDD ultimately,  and RDD's are immutable.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#unsupported-operations
> these are things that are not supported and this list needs to be validated
> with the use case you have.
>
> From my experience Structured Streaming is still new and DStreams API is a
> matured API.
> some things that are missing or need to explore more.
>
> watermarking/windowing based on no of records in a particular window
>
> assuming you have watermark and windowing on event time of the data,  the
> resultant dataframe is grouped data set, only thing you can do is run
> aggregate functions. you can't simply use that output as another dataframe
> and manipulate. There is a custom aggregator but I feel its limited.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#arbitrary-stateful-operations
> There is option to do stateful operations, using GroupState where the
> function gets iterator of events for that window. This is the closest
> access
> to StateStore a developer could get.
> This arbitrary state that programmer could keep across invocations has its
> limitations as such how much state we could keep?, is that state stored in
> driver memory? What happens if the spark job fails is this checkpointed or
> restored?
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Max number of streams supported ?

2018-01-31 Thread Michael Armbrust
-dev +user


> Similarly for structured streaming, Would there be any limit on number of
> of streaming sources I can have ?
>

There is no fundamental limit, but each stream will have a thread on the
driver that is doing coordination of execution.  We comfortably run 20+
streams on a single cluster in production, but I have not pushed the
limits.  You'd want to test with your specific application.


Re: Max number of streams supported ?

2018-01-31 Thread Michael Armbrust
-dev +user


> Similarly for structured streaming, Would there be any limit on number of
> of streaming sources I can have ?
>

There is no fundamental limit, but each stream will have a thread on the
driver that is doing coordination of execution.  We comfortably run 20+
streams on a single cluster in production, but I have not pushed the
limits.  You'd want to test with your specific application.


[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2018-01-18 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331141#comment-16331141
 ] 

Michael Armbrust commented on SPARK-20928:
--

There is more work to do so I might leave the umbrella open, but we are going 
to have an initial version that supports reading and writing from/to Kafka with 
very low latency in Spark 2.3!  Stay tuned for updates to docs and blog posts.

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>Priority: Major
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

2018-01-12 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323883#comment-16323883
 ] 

Michael Armbrust commented on SPARK-23050:
--

[~zsxwing] is correct.  While it is possible for files to get written out more 
than once, the whole point of the log is to make sure that readers will only 
see exactly one copy. It is possible we will come up with a new streaming 
commit protocol as part of the migration to the V2 data source, but for now I 
would call this working as intended.

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -
>
> Key: SPARK-23050
> URL: https://issues.apache.org/jira/browse/SPARK-23050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yash Sharma
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
> .format("parquet") \
> .option("compression", "snappy") \
> .option("path", "s3://path/data/") \
> .option("checkpointLocation", "s3://path/checkpoint/") \
> .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00  17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10  17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:>(277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 

Re: Dataset API inconsistencies

2018-01-10 Thread Michael Armbrust
I wrote Datasets, and I'll say I only use them when I really need to (i.e.
when it would be very cumbersome to express what I am trying to do
relationally).  Dataset operations are almost always going to be slower
than their DataFrame equivalents since they usually require materializing
objects (where as DataFrame operations usually generate code that operates
directly on binary encoded data).

We certainly could flesh out the API further (e.g. add orderBy that takes a
lambda function), but so far I have not seen a lot of demand for this, and
it would be strictly slower than the DataFrame version. I worry this
wouldn't actually be beneficial to users as it would give them a choice
that looks the same but has performance implications that are non-obvious.
If I'm in the minority though with this opinion, we should do it.

Regarding the concerns about type-safety, I haven't really found that to be
a major issue.  Even though you don't have type safety from the scala
compiler, the Spark SQL analyzer is checking your query before any
execution begins. This opinion is perhaps biased by the fact that I do a
lot of Spark SQL programming in notebooks where the difference between
"compile-time" and "runtime" is pretty minimal.

On Wed, Jan 10, 2018 at 1:45 AM, Alex Nastetsky 
wrote:

> I am finding using the Dataset API to be very cumbersome to use, which is
> unfortunate, as I was looking forward to the type-safety after coming from
> a Dataframe codebase.
>
> This link summarizes my troubles: http://loicdescotte.
> github.io/posts/spark2-datasets-type-safety/
>
> The problem is having to continuously switch back and forth between typed
> and untyped semantics, which really kills productivity. In contrast, the
> RDD API is consistently typed and the Dataframe API is consistently
> untyped. I don't have to continuously stop and think about which one to use
> for each operation.
>
> I gave the Frameless framework (mentioned in the link) a shot, but
> eventually started running into oddities and lack of enough documentation
> and community support and did not want to sink too much time into it.
>
> At this point I'm considering just sticking with Dataframes, as I don't
> really consider Datasets to be usable. Has anyone had a similar experience
> or has had better luck?
>
> Alex.
>


[jira] [Commented] (SPARK-22947) SPIP: as-of join in Spark SQL

2018-01-03 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16310423#comment-16310423
 ] 

Michael Armbrust commented on SPARK-22947:
--

+1 to [~rxin]'s question.  This seems like it might just be sugar on top of 
[SPARK-8682].

> SPIP: as-of join in Spark SQL
> -
>
> Key: SPARK-22947
> URL: https://issues.apache.org/jira/browse/SPARK-22947
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Li Jin
> Attachments: SPIP_ as-of join in Spark SQL (1).pdf
>
>
> h2. Background and Motivation
> Time series analysis is one of the most common analysis on financial data. In 
> time series analysis, as-of join is a very common operation. Supporting as-of 
> join in Spark SQL will allow many use cases of using Spark SQL for time 
> series analysis.
> As-of join is “join on time” with inexact time matching criteria. Various 
> library has implemented asof join or similar functionality:
> Kdb: https://code.kx.com/wiki/Reference/aj
> Pandas: 
> http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
> R: This functionality is called “Last Observation Carried Forward”
> https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf
> JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
> Flint: https://github.com/twosigma/flint#temporal-join-functions
> This proposal advocates introducing new API in Spark SQL to support as-of 
> join.
> h2. Target Personas
> Data scientists, data engineers
> h2. Goals
> * New API in Spark SQL that allows as-of join
> * As-of join of multiple table (>2) should be performant, because it’s very 
> common that users need to join multiple data sources together for further 
> analysis.
> * Define Distribution, Partitioning and shuffle strategy for ordered time 
> series data
> h2. Non-Goals
> These are out of scope for the existing SPIP, should be considered in future 
> SPIP as improvement to Spark’s time series analysis ability:
> * Utilize partition information from data source, i.e, begin/end of each 
> partition to reduce sorting/shuffling
> * Define API for user to implement asof join time spec in business calendar 
> (i.e. lookback one business day, this is very common in financial data 
> analysis because of market calendars)
> * Support broadcast join
> h2. Proposed API Changes
> h3. TimeContext
> TimeContext is an object that defines the time scope of the analysis, it has 
> begin time (inclusive) and end time (exclusive). User should be able to 
> change the time scope of the analysis (i.e, from one month to five year) by 
> just changing the TimeContext. 
> To Spark engine, TimeContext is a hint that:
> can be used to repartition data for join
> serve as a predicate that can be pushed down to storage layer
> Time context is similar to filtering time by begin/end, the main difference 
> is that time context can be expanded based on the operation taken (see 
> example in as-of join).
> Time context example:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> {code}
> h3. asofJoin
> h4. User Case A (join without key)
> Join two DataFrames on time, with one day lookback:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, quantity
> 20160101, 100
> 20160102, 50
> 20160104, -50
> 20160105, 100
> dfB:
> time, price
> 20151231, 100.0
> 20160104, 105.0
> 20160105, 102.0
> output:
> time, quantity, price
> 20160101, 100, 100.0
> 20160102, 50, null
> 20160104, -50, 105.0
> 20160105, 100, 102.0
> {code}
> Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This 
> is an important illustration of the time context - it is able to expand the 
> context to 20151231 on dfB because of the 1 day lookback.
> h4. Use Case B (join with key)
> To join on time and another key (for instance, id), we use “by” to specify 
> the key.
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = 
> JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output

[jira] [Commented] (SPARK-22929) Short name for "kafka" doesn't work in pyspark with packages

2017-12-31 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307144#comment-16307144
 ] 

Michael Armbrust commented on SPARK-22929:
--

Haha, thanks [~sowen], you are right. Kafka is a hard word I guess :)

> Short name for "kafka" doesn't work in pyspark with packages
> 
>
> Key: SPARK-22929
> URL: https://issues.apache.org/jira/browse/SPARK-22929
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>Priority: Critical
>
> When I start pyspark using the following command:
> {code}
> bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
> {code}
> The following throws an error:
> {code}
> spark.read.format("kakfa")...
> py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
> : java.lang.ClassNotFoundException: Failed to find data source: kakfa. Please 
> find packages at http://spark.apache.org/third-party-projects.html
> {code}
> The following does work:
> {code}
> spark.read.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22929) Short name for "kafka" doesn't work in pyspark with packages

2017-12-30 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-22929:


 Summary: Short name for "kafka" doesn't work in pyspark with 
packages
 Key: SPARK-22929
 URL: https://issues.apache.org/jira/browse/SPARK-22929
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: Michael Armbrust
Priority: Critical


When I start pyspark using the following command:
{code}
bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
{code}

The following throws an error:
{code}
spark.read.format("kakfa")...

py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: java.lang.ClassNotFoundException: Failed to find data source: kakfa. Please 
find packages at http://spark.apache.org/third-party-projects.html
{code}

The following does work:
{code}
spark.read.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")...
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22862) Docs on lazy elimination of columns missing from an encoder.

2017-12-21 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-22862:


 Summary: Docs on lazy elimination of columns missing from an 
encoder.
 Key: SPARK-22862
 URL: https://issues.apache.org/jira/browse/SPARK-22862
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1
Reporter: Michael Armbrust
Assignee: Michael Armbrust






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22739) Additional Expression Support for Objects

2017-12-20 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299232#comment-16299232
 ] 

Michael Armbrust commented on SPARK-22739:
--

Sounds good to me.  I'm happy to provide pointers on the PR, just "@marmbrus".

> Additional Expression Support for Objects
> -
>
> Key: SPARK-22739
> URL: https://issues.apache.org/jira/browse/SPARK-22739
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Aleksander Eskilson
>
> Some discussion in Spark-Avro [1] motivates additions and minor changes to 
> the {{Objects}} Expressions API [2]. The proposed changes include
> * a generalized form of {{initializeJavaBean}} taking a sequence of 
> initialization expressions that can be applied to instances of varying objects
> * an object cast that performs a simple Java type cast against a value
> * making {{ExternalMapToCatalyst}} public, for use in outside libraries
> These changes would facilitate the writing of custom encoders for varying 
> objects that cannot already be readily converted to a statically typed 
> dataset by a JavaBean encoder (e.g. Avro).
> [1] -- 
> https://github.com/databricks/spark-avro/pull/217#issuecomment-342599110
> [2] --
>  
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22739) Additional Expression Support for Objects

2017-12-20 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299167#comment-16299167
 ] 

Michael Armbrust commented on SPARK-22739:
--

Any progress on this?  Branch cut is January 1st, and I'd love to include this.

> Additional Expression Support for Objects
> -
>
> Key: SPARK-22739
> URL: https://issues.apache.org/jira/browse/SPARK-22739
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Aleksander Eskilson
>
> Some discussion in Spark-Avro [1] motivates additions and minor changes to 
> the {{Objects}} Expressions API [2]. The proposed changes include
> * a generalized form of {{initializeJavaBean}} taking a sequence of 
> initialization expressions that can be applied to instances of varying objects
> * an object cast that performs a simple Java type cast against a value
> * making {{ExternalMapToCatalyst}} public, for use in outside libraries
> These changes would facilitate the writing of custom encoders for varying 
> objects that cannot already be readily converted to a statically typed 
> dataset by a JavaBean encoder (e.g. Avro).
> [1] -- 
> https://github.com/databricks/spark-avro/pull/217#issuecomment-342599110
> [2] --
>  
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22739) Additional Expression Support for Objects

2017-12-20 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-22739:
-
Target Version/s: 2.3.0

> Additional Expression Support for Objects
> -
>
> Key: SPARK-22739
> URL: https://issues.apache.org/jira/browse/SPARK-22739
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Aleksander Eskilson
>
> Some discussion in Spark-Avro [1] motivates additions and minor changes to 
> the {{Objects}} Expressions API [2]. The proposed changes include
> * a generalized form of {{initializeJavaBean}} taking a sequence of 
> initialization expressions that can be applied to instances of varying objects
> * an object cast that performs a simple Java type cast against a value
> * making {{ExternalMapToCatalyst}} public, for use in outside libraries
> These changes would facilitate the writing of custom encoders for varying 
> objects that cannot already be readily converted to a statically typed 
> dataset by a JavaBean encoder (e.g. Avro).
> [1] -- 
> https://github.com/databricks/spark-avro/pull/217#issuecomment-342599110
> [2] --
>  
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Spark error while trying to spark.read.json()

2017-12-19 Thread Michael Armbrust
- dev

java.lang.AbstractMethodError almost always means that you have different
libraries on the classpath than at compilation time.  In this case I would
check to make sure you have the correct version of Scala (and only have one
version of scala) on the classpath.

On Tue, Dec 19, 2017 at 5:42 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> Can anyone help me with below error,
>
> Exception in thread "main" java.lang.AbstractMethodError
> at scala.collection.TraversableLike$class.filterNot(TraversableLike.
> scala:278)
> at org.apache.spark.sql.types.StructType.filterNot(StructType.scala:98)
> at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:386)
> at org.spark.jsonDF.StructStreamKafkaToDF$.getValueSchema(
> StructStreamKafkaToDF.scala:22)
> at org.spark.jsonDF.StructStreaming$.createRowDF(StructStreaming.scala:21)
> at SparkEntry$.delayedEndpoint$SparkEntry$1(SparkEntry.scala:22)
> at SparkEntry$delayedInit$body.apply(SparkEntry.scala:7)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at SparkEntry$.main(SparkEntry.scala:7)
> at SparkEntry.main(SparkEntry.scala)
>
> This happening, when i try to pass Dataset[String] containing jsons to
> spark.read.json(Records).
>
> Regards,
> Satyajit.
>


Re: Spark error while trying to spark.read.json()

2017-12-19 Thread Michael Armbrust
- dev

java.lang.AbstractMethodError almost always means that you have different
libraries on the classpath than at compilation time.  In this case I would
check to make sure you have the correct version of Scala (and only have one
version of scala) on the classpath.

On Tue, Dec 19, 2017 at 5:42 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> Can anyone help me with below error,
>
> Exception in thread "main" java.lang.AbstractMethodError
> at scala.collection.TraversableLike$class.filterNot(TraversableLike.
> scala:278)
> at org.apache.spark.sql.types.StructType.filterNot(StructType.scala:98)
> at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:386)
> at org.spark.jsonDF.StructStreamKafkaToDF$.getValueSchema(
> StructStreamKafkaToDF.scala:22)
> at org.spark.jsonDF.StructStreaming$.createRowDF(StructStreaming.scala:21)
> at SparkEntry$.delayedEndpoint$SparkEntry$1(SparkEntry.scala:22)
> at SparkEntry$delayedInit$body.apply(SparkEntry.scala:7)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at SparkEntry$.main(SparkEntry.scala:7)
> at SparkEntry.main(SparkEntry.scala)
>
> This happening, when i try to pass Dataset[String] containing jsons to
> spark.read.json(Records).
>
> Regards,
> Satyajit.
>


Re: Timeline for Spark 2.3

2017-12-19 Thread Michael Armbrust
Do people really need to be around for the branch cut (modulo the person
cutting the branch)?

1st or 2nd doesn't really matter to me, but I am +1 kicking this off as
soon as we enter the new year :)

Michael

On Tue, Dec 19, 2017 at 4:39 PM, Holden Karau  wrote:

> Sounds reasonable, although I'd choose the 2nd perhaps just since lots of
> folks are off on the 1st?
>
> On Tue, Dec 19, 2017 at 4:36 PM, Sameer Agarwal 
> wrote:
>
>> Let's aim for the 2.3 branch cut on 1st Jan and RC1 a week after that
>> (i.e., week of 8th Jan)?
>>
>>
>> On Fri, Dec 15, 2017 at 12:54 AM, Holden Karau 
>> wrote:
>>
>>> So personally I’d be in favour or pushing to early January, doing a
>>> release over the holidays is a little rough with herding all of people to
>>> vote.
>>>
>>> On Thu, Dec 14, 2017 at 11:49 PM Erik Erlandson 
>>> wrote:
>>>
 I wanted to check in on the state of the 2.3 freeze schedule.  Original
 proposal was "late Dec", which is a bit open to interpretation.

 We are working to get some refactoring done on the integration testing
 for the Kubernetes back-end in preparation for testing upcoming release
 candidates, however holiday vacation time is about to begin taking its toll
 both on upstream reviewing and on the "downstream" spark-on-kube fork.

 If the freeze pushed into January, that would take some of the pressure
 off the kube back-end upstreaming. However, regardless, I was wondering if
 the dates could be clarified.
 Cheers,
 Erik


 On Mon, Nov 13, 2017 at 5:13 PM, dji...@dataxu.com 
 wrote:

> Hi,
>
> What is the process to request an issue/fix to be included in the next
> release? Is there a place to vote for features?
> I am interested in https://issues.apache.org/jira/browse/SPARK-13127,
> to see
> if we can get Spark upgrade parquet to 1.9.0, which addresses the
> https://issues.apache.org/jira/browse/PARQUET-686.
> Can we include the fix in Spark 2.3 release?
>
> Thanks,
>
> Dong
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
 --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Sameer Agarwal
>> Software Engineer | Databricks Inc.
>> http://cs.berkeley.edu/~sameerag
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


[jira] [Commented] (SPARK-22824) Spark Structured Streaming Source trait breaking change

2017-12-18 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295676#comment-16295676
 ] 

Michael Armbrust commented on SPARK-22824:
--

This is technically an internal API (as is all of 
[org.apache.spark.sql.execution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala]).
  That said, I think we should do our best to preserve compatibility until 
there is a {{@Stable}} public alternative (which is the goal of the V2 project).

We'll open a PR to try and fix this.

> Spark Structured Streaming Source trait breaking change
> ---
>
> Key: SPARK-22824
> URL: https://issues.apache.org/jira/browse/SPARK-22824
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Azure Databricks 3.4
>Reporter: Matthieu Maitre
>
> org.apache.spark.sql.execution.streaming.Offset was moved to 
> org.apache.spark.sql.sources.v2.reader.Offset on the Source trait:
> https://github.com/apache/spark/commit/f8c7c1f21aa9d1fd38b584ca8c4adf397966e9f7#diff-56453fdb4dc2d7666c7ab237cb102189
> This broke our custom sources that are called in Spark jobs running on Azure 
> Databricks 3.4 since the Maven package does not match the hosted Spark bits. 
> We use the following Maven version to be able to deploy on Azure Databricks:
> 
> org.apache.spark
> spark-sql_2.11
> 2.3.0-SNAPSHOT
> 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22824) Spark Structured Streaming Source trait breaking change

2017-12-18 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-22824:


Assignee: Jose Torres

> Spark Structured Streaming Source trait breaking change
> ---
>
> Key: SPARK-22824
> URL: https://issues.apache.org/jira/browse/SPARK-22824
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Azure Databricks 3.4
>Reporter: Matthieu Maitre
>Assignee: Jose Torres
>
> org.apache.spark.sql.execution.streaming.Offset was moved to 
> org.apache.spark.sql.sources.v2.reader.Offset on the Source trait:
> https://github.com/apache/spark/commit/f8c7c1f21aa9d1fd38b584ca8c4adf397966e9f7#diff-56453fdb4dc2d7666c7ab237cb102189
> This broke our custom sources that are called in Spark jobs running on Azure 
> Databricks 3.4 since the Maven package does not match the hosted Spark bits. 
> We use the following Maven version to be able to deploy on Azure Databricks:
> 
> org.apache.spark
> spark-sql_2.11
> 2.3.0-SNAPSHOT
> 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-12-12 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288391#comment-16288391
 ] 

Michael Armbrust commented on SPARK-20928:
--

An update on this.  We've started to create subtasks break down the process of 
adding this new execution mode and we are targeting an alpha version in 2.3.  
The basics of the new API for Sources and Sinks (for both microbatch and 
continuous mode) has been up for a few days if people want to see more details. 
 We'll follow with PRs to add a continuous execution engine and an 
implementation of a continuous kafka connector.

Regarding some of the questions:
 - The version we are targeting for 2.3 will only support map operations and 
thus will not support shuffles / aggregating by windows (although the window() 
operator is just a projection so will work for window assignment).
 - I think the API is designed in such as way that we can build a streaming 
shuffle that aligns on epochIds in the future, allowing us to easily extend the 
continuous engine to handle stateful operations as well.

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: queryable state & streaming

2017-12-08 Thread Michael Armbrust
https://issues.apache.org/jira/browse/SPARK-16738

I don't believe anyone is working on it yet.  I think the most useful thing
is to start enumerating requirements and use cases and then we can talk
about how to build it.

On Fri, Dec 8, 2017 at 10:47 AM, Stavros Kontopoulos <
st.kontopou...@gmail.com> wrote:

> Cool Burak do you have a pointer, should I take the initiative for a first
> design document or Databricks is working on it?
>
> Best,
> Stavros
>
> On Fri, Dec 8, 2017 at 8:40 PM, Burak Yavuz  wrote:
>
>> Hi Stavros,
>>
>> Queryable state is definitely on the roadmap! We will revamp the
>> StateStore API a bit, and a queryable StateStore is definitely one of the
>> things we are thinking about during that revamp.
>>
>> Best,
>> Burak
>>
>> On Dec 8, 2017 9:57 AM, "Stavros Kontopoulos" 
>> wrote:
>>
>>> Just to re-phrase my question: Would query-able state make a viable
>>> SPIP?
>>>
>>> Regards,
>>> Stavros
>>>
>>> On Thu, Dec 7, 2017 at 1:34 PM, Stavros Kontopoulos <
>>> st.kontopou...@gmail.com> wrote:
>>>
 Hi,

 Maybe this has been discussed before. Given the fact that many
 streaming apps out there use state extensively, could be a good idea to
 make Spark expose streaming state with an external API like other
 systems do (Kafka streams, Flink etc), in order to facilitate
 interactive queries?

 Regards,
 Stavros

>>>
>>>
>


Re: Kafka version support

2017-11-30 Thread Michael Armbrust
Oh good question.  I was saying that the stock structured streaming
connector should be able to talk to 0.11 or 1.0 brokers.

On Thu, Nov 30, 2017 at 1:12 PM, Cody Koeninger  wrote:

> Are you talking about the broker version, or the kafka-clients artifact
> version?
>
> On Thu, Nov 30, 2017 at 12:17 AM, Raghavendra Pandey
>  wrote:
> > Just wondering if anyone has tried spark structured streaming kafka
> > connector (2.2) with Kafka 0.11 or Kafka 1.0 version
> >
> > Thanks
> > Raghav
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka version support

2017-11-30 Thread Michael Armbrust
I would expect that to work.

On Wed, Nov 29, 2017 at 10:17 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> Just wondering if anyone has tried spark structured streaming kafka
> connector (2.2) with Kafka 0.11 or Kafka 1.0 version
>
> Thanks
> Raghav
>


Re: Generate windows on processing time in Spark Structured Streaming

2017-11-10 Thread Michael Armbrust
Hmmm, we should allow that.  current_timestamp() is acutally deterministic
within any given batch.  Could you open a JIRA ticket?

On Fri, Nov 10, 2017 at 1:52 AM, wangsan  wrote:

> Hi all,
>
> How can I use current processing time to generate windows in streaming
> processing?
> *window* function's Scala doc says "For a streaming query, you may use
> the function current_timestamp to generate windows on processing time.”
>  But when using current_timestamp as column in window function, exceptions
> occurred.
>
> Here are my code:
>
> val socketDF = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> socketDF.createOrReplaceTempView("words")
> val windowedCounts = spark.sql(
>   """
> |SELECT value as word, current_timestamp() as time, count(1) as count 
> FROM words
> |   GROUP BY window(time, "5 seconds"), word
>   """.stripMargin)
>
> windowedCounts
>   .writeStream
>   .outputMode("complete")
>   .format("console")
>   .start()
>   .awaitTermination()
>
> And here are Exception Info:
> *Caused by: org.apache.spark.sql.AnalysisException: nondeterministic
> expressions are only allowed in*
> *Project, Filter, Aggregate or Window, *found:
>
>
>
>


Timeline for Spark 2.3

2017-11-09 Thread Michael Armbrust
According to the timeline posted on the website, we are nearing branch cut
for Spark 2.3.  I'd like to propose pushing this out towards mid to late
December for a couple of reasons and would like to hear what people think.

1. I've done release management during the Thanksgiving / Christmas time
before and in my experience, we don't actually get a lot of testing during
this time due to vacations and other commitments. I think beginning the RC
process in early January would give us the best coverage in the shortest
amount of time.
2. There are several large initiatives in progress that given a little more
time would leave us with a much more exciting 2.3 release. Specifically,
the work on the history server, Kubernetes and continuous processing.
3. Given the actual release date of Spark 2.2, I think we'll still get
Spark 2.3 out roughly 6 months after.

Thoughts?

Michael


Re: [Spark Structured Streaming] Changing partitions of (flat)MapGroupsWithState

2017-11-08 Thread Michael Armbrust
The relevant config is spark.sql.shuffle.partitions.  Note that once you
start a query, this number is fixed.  The config will only affect queries
starting from an empty checkpoint.

On Wed, Nov 8, 2017 at 7:34 AM, Teemu Heikkilä  wrote:

> I have spark structured streaming job and I'm crunching through few
> terabytes of data.
>
> I'm using file stream reader and it works flawlessly, I can adjust the
> partitioning of that with spark.default.parallelism
>
> However I'm doing sessionization for the data after loading it and I'm
> currently locked with just 200 partitions for that stage. I've tried to
> repartition before and after the stateful map but it just adds new stage
> and so it's not very useful
>
> Changing spark.sql.shuffle.partitions doesn't affect the count either.
>
> val sessions = streamingSource // -> spark.default.parallelism defined
> amount of partitions/tasks (ie. 2000)
>  .repartition(n) // -> n partitions/tasks
>  .groupByKey(event => event.sessid) // -> stage opens / fixed 200 tasks
>  .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.
> EventTimeTimeout())(SessionState.updateSessionEvents) // -> fixed 200
> tasks / stage closes
>
>
> I tried to grep through spark source code but couldn’t find that param
> anywhere.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Vote] SPIP: Continuous Processing Mode for Structured Streaming

2017-11-06 Thread Michael Armbrust
+1

On Sat, Nov 4, 2017 at 11:02 AM, Xiao Li  wrote:

> +1
>
> 2017-11-04 11:00 GMT-07:00 Burak Yavuz :
>
>> +1
>>
>> On Fri, Nov 3, 2017 at 10:02 PM, vaquar khan 
>> wrote:
>>
>>> +1
>>>
>>> On Fri, Nov 3, 2017 at 8:14 PM, Weichen Xu 
>>> wrote:
>>>
 +1.

 On Sat, Nov 4, 2017 at 8:04 AM, Matei Zaharia 
 wrote:

> +1 from me too.
>
> Matei
>
> > On Nov 3, 2017, at 4:59 PM, Wenchen Fan  wrote:
> >
> > +1.
> >
> > I think this architecture makes a lot of sense to let executors talk
> to source/sink directly, and bring very low latency.
> >
> > On Thu, Nov 2, 2017 at 9:01 AM, Sean Owen 
> wrote:
> > +0 simply because I don't feel I know enough to have an opinion. I
> have no reason to doubt the change though, from a skim through the doc.
> >
> >
> > On Wed, Nov 1, 2017 at 3:37 PM Reynold Xin 
> wrote:
> > Earlier I sent out a discussion thread for CP in Structured
> Streaming:
> >
> > https://issues.apache.org/jira/browse/SPARK-20928
> >
> > It is meant to be a very small, surgical change to Structured
> Streaming to enable ultra-low latency. This is great timing because we are
> also designing and implementing data source API v2. If designed properly,
> we can have the same data source API working for both streaming and batch.
> >
> >
> > Following the SPIP process, I'm putting this SPIP up for a vote.
> >
> > +1: Let's go ahead and design / implement the SPIP.
> > +0: Don't really care.
> > -1: I do not think this is a good idea for the following reasons.
> >
> >
> >
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

>>>
>>>
>>> --
>>> Regards,
>>> Vaquar Khan
>>> +1 -224-436-0783 <(224)%20436-0783>
>>> Greater Chicago
>>>
>>
>>
>


Re: Structured Stream equivalent of reduceByKey

2017-11-06 Thread Michael Armbrust
Hmmm, I see.  You could output the delta using flatMapGroupsWithState
<https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout->
probably.

On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <piyush.muk...@gmail.com>
wrote:

> Thanks, Michael
> I have explored Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>
>  with
> update mode. The problem is it will give the overall aggregated value for
> the changed. while I only want the delta change in the group as the
> aggregation we are doing at sink level too.
>
> Below is the plan generated with count Aggregator.
>
> *HashAggregate
> StateStoreSave
> *HashAggregate,
> StateStoreRestore
> *HashAggregate,
> Exchange
> *HashAggregate,
> *Project
> StreamingRelation
>
> we are looking for some aggregation which will avoid state
> store interactions.
>
> Also anyone aware of any design doc or some example about how we can add
> new operation on dataSet and corresponding physical plan.
>
>
>
> On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> - dev
>>
>> I think you should be able to write an Aggregator
>> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
>> You probably want to run in update mode if you are looking for it to output
>> any group that has changed in the batch.
>>
>> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> we are migrating some jobs from Dstream to Structured Stream.
>>>
>>> Currently to handle aggregations we call map and reducebyKey on each RDD
>>> like
>>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>>
>>> The final output of each RDD is merged to the sink with support for
>>> aggregation at the sink( Like co-processor at HBase ).
>>>
>>> In the new DataSet API, I am not finding any suitable API to aggregate
>>> over the micro-batch.
>>> Most of the aggregation API uses state-store and provide global
>>> aggregations. ( with append mode it does not give the change in existing
>>> buckets )
>>> Problems we are suspecting are :
>>>  1) state-store is tightly linked to the job definitions. while in our
>>> case we want may edit the job while keeping the older calculated aggregate
>>> as it is.
>>>
>>> The desired result can be achieved with below dataset APIs.
>>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>>> merge(valueItr))
>>> while on observing the physical plan it does not call any merge before
>>> sort.
>>>
>>>  Anyone aware of API or other workarounds to get the desired result?
>>>
>>
>>
>


Re: Structured Stream equivalent of reduceByKey

2017-10-26 Thread Michael Armbrust
- dev

I think you should be able to write an Aggregator
.
You probably want to run in update mode if you are looking for it to output
any group that has changed in the batch.

On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati 
wrote:

> Hi,
> we are migrating some jobs from Dstream to Structured Stream.
>
> Currently to handle aggregations we call map and reducebyKey on each RDD
> like
> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>
> The final output of each RDD is merged to the sink with support for
> aggregation at the sink( Like co-processor at HBase ).
>
> In the new DataSet API, I am not finding any suitable API to aggregate
> over the micro-batch.
> Most of the aggregation API uses state-store and provide global
> aggregations. ( with append mode it does not give the change in existing
> buckets )
> Problems we are suspecting are :
>  1) state-store is tightly linked to the job definitions. while in our
> case we want may edit the job while keeping the older calculated aggregate
> as it is.
>
> The desired result can be achieved with below dataset APIs.
> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
> while on observing the physical plan it does not call any merge before
> sort.
>
>  Anyone aware of API or other workarounds to get the desired result?
>


Re: Structured Stream equivalent of reduceByKey

2017-10-26 Thread Michael Armbrust
- dev

I think you should be able to write an Aggregator
.
You probably want to run in update mode if you are looking for it to output
any group that has changed in the batch.

On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati 
wrote:

> Hi,
> we are migrating some jobs from Dstream to Structured Stream.
>
> Currently to handle aggregations we call map and reducebyKey on each RDD
> like
> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>
> The final output of each RDD is merged to the sink with support for
> aggregation at the sink( Like co-processor at HBase ).
>
> In the new DataSet API, I am not finding any suitable API to aggregate
> over the micro-batch.
> Most of the aggregation API uses state-store and provide global
> aggregations. ( with append mode it does not give the change in existing
> buckets )
> Problems we are suspecting are :
>  1) state-store is tightly linked to the job definitions. while in our
> case we want may edit the job while keeping the older calculated aggregate
> as it is.
>
> The desired result can be achieved with below dataset APIs.
> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
> while on observing the physical plan it does not call any merge before
> sort.
>
>  Anyone aware of API or other workarounds to get the desired result?
>


Re: Implement Dataset reader from SEQ file with protobuf to Dataset

2017-10-08 Thread Michael Armbrust
spark-avro  would be a good
example to start with.

On Sun, Oct 8, 2017 at 3:00 AM, Serega Sheypak 
wrote:

> Hi, did anyone try to implement Spark SQL dataset reader from SEQ file
> with protobuf inside to Dataset?
>
> Imagine I have protobuf def
> Person
>  - name: String
>  - lastName: String
> - phones: List[String]
>
> and generated scala case class:
> case class Person(name:String, lastName: String, phones: List[String])
>
> I want to write some component that gives me Dataset with types schema.
>
> val personsDataset = spark.read
>   .option("inferSchema", "true")[Person]
>
> Where can I take a look at references?
>


Re: Chaining Spark Streaming Jobs

2017-09-18 Thread Michael Armbrust
You specify the schema when loading a dataframe by calling
spark.read.schema(...)...

On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
> at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
> DataSource.scala:223)
> at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
> lzycompute(DataSource.scala:87)
> at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
> DataSource.scala:87)
> at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
> StreamingRelation.scala:30)
> at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:125)
> at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:134)
> at com.aol.persist.UplynkAggregates$.aggregator(
> UplynkAggregates.scala:23)
> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(
> AppMain.java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
> """select sum(col1), sum(col2), id, first(name)
>   from enrichedtb
>   group by id
> """.stripMargin
>
>   def aggregator(conf:Config)={
> implicit val spark = 
> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
> implicit val sqlctx = spark.sqlContext
> printf("Source path is" + conf.getString("source.path"))
> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
> Added this as it was complaining about schema.
> val df=spark.readStream.format("parquet").option("inferSchema", 
> true).schema(schemadf.schema).load(conf.getString("source.path"))
> df.createOrReplaceTempView("enrichedtb")
> val res = spark.sql(aggregation)
> 
> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
> val mainconf = ConfigFactory.load()
> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
> print(conf.toString)
> aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it 
> to the readStream API. With that, I get this error:
>
> at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they 
> all point to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread Michael Armbrust
How many UUIDs do you expect to have in a day?  That is likely where all
the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> *Yes, my code is shown below(I also post my code in another mail)*
> /**
> * input
> */
>   val logs = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", BROKER_SERVER)
> .option("subscribe", TOPIC)
> .option("startingOffset", "latest")
> .load()
>
>   /**
> * process
> */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
> .map(parseFunction)
> .select(
>   $"_1".alias("date").cast("timestamp"),
>   $"_2".alias("uuid").cast("string")
> )
>
>   val results = events
> .withWatermark("date", "1 day")
> .dropDuplicates("uuid", "date")
> .groupBy($"date")
> .count()
>
>   /**
> * output
> */
>   val query = results
> .writeStream
> .outputMode("update")
> .format("console")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("1 seconds"))
> .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
> val json = Json.parse(str)
> val timestamp = (json \ "time").get.toString().toLong
> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
> val uuid = (json \ "uuid").get.toString()
> (date, uuid)
>   }
>
> and the java heap space is like (I've increase the executor memory to 15g):
>
> [image: image.png]
> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道:
>
>> Can you show the full query you are running?
>>
>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm using structured streaming to count unique visits of our website. I
>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>> full gc, and once the count raises to about more than 4.5 millions the
>>> application will be blocked and finally crash in OOM. It's kind of
>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>> of SS? Thanks.
>>>
>>
>>


Re: [SS]How to add a column with custom system time?

2017-09-14 Thread Michael Armbrust
Can you show the explain() for the version that doesn't work?  You might
just be hitting a bug.

On Tue, Sep 12, 2017 at 9:03 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> It seems current_timestamp() cannot be used directly in window function?
> because after attempts I found that using
>
> *df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
> "15 minutes"), $"count")*
>
> instead of
>
> *df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*
>
> throws no exception and works fine. I don't know if this is a problem that
> needs improvement.
>
>
> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道:
>
>> and I use .withColumn("window", window(current_timestamp(), "15
>> minutes")) after count
>>
>> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:
>>
>>> *Yes, my code is shown below*
>>> /**
>>> * input
>>> */
>>>   val logs = spark
>>> .readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>>> .option("subscribe", TOPIC)
>>> .option("startingOffset", "latest")
>>> .load()
>>>
>>>   /**
>>> * process
>>> */
>>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>>
>>>   val events = logValues
>>> .map(parseFunction)
>>> .select(
>>>   $"_1".alias("date").cast("timestamp"),
>>>   $"_2".alias("uuid").cast("string")
>>> )
>>>
>>>   val results = events
>>> .withWatermark("date", "1 day")
>>> .dropDuplicates("uuid", "date")
>>> .groupBy($"date")
>>> .count()
>>> .withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>>   /**
>>> * output
>>>     */
>>>   val query = results
>>> .writeStream
>>> .outputMode("update")
>>> .format("console")
>>> .option("truncate", "false")
>>> .trigger(Trigger.ProcessingTime("1 seconds"))
>>> .start()
>>>
>>>   query.awaitTermination()
>>>
>>> *and I use play json to parse input logs from kafka ,the parse function
>>> is like*
>>>
>>>   def parseFunction(str: String): (Long, String) = {
>>> val json = Json.parse(str)
>>> val timestamp = (json \ "time").get.toString().toLong
>>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>>> val uuid = (json \ "uuid").get.toString()
>>> (date, uuid)
>>>   }
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>>>
>>>> Can you show all the code?  This works for me.
>>>>
>>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> The spark version is 2.2.0
>>>>>
>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>>>
>>>>>> Which version of spark?
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for reply, but using this method I got an exception:
>>>>>>>
>>>>>>> "Exception in thread "main" 
>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>>>>>> nondeterministic expressions are only allowed in
>>>>>>>
>>>>>>> Project, Filter, Aggregate or Window"
>>>>>>>
>>>>>>> Can you give more advice?
>>>>>>>
>>>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>>>
>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>
>>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>>>
>>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>>>> current system time aligned with 15 minutes?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>


Re: [Structured Streaming] Multiple sources best practice/recommendation

2017-09-14 Thread Michael Armbrust
I would probably suggest that you partition by format (though you can get
the file name from the build in function input_file_name()).  You can load
multiple streams from different directories and union them together as long
as the schema is the same after parsing.  Otherwise you can just run
multiple streams on the same cluster.

On Wed, Sep 13, 2017 at 7:56 AM, JG Perrin  wrote:

> Hi,
>
>
>
> I have different files being dumped on S3, I want to ingest them and join
> them.
>
>
>
> What does sound better to you? Have one “ directory” for all or one per
> file format?
>
>
>
> If I have one directory for all, can you get some metadata about the file,
> like its name?
>
>
>
> If multiple directory, how can I have multiple “listeners”?
>
>
>
> Thanks
>
>
>
> jg
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: Easy way to get offset metatada with Spark Streaming API

2017-09-14 Thread Michael Armbrust
Yep, that is correct.  You can also use the query ID which is a GUID that
is stored in the checkpoint and preserved across restarts if you want to
distinguish the batches from different streams.

sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)

This was added recently
<https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27>
though.

On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm.naume...@gmail.com>
wrote:

> Ok. So since I can get repeated batch ids, I guess I can just store the
> last committed batch id in my storage (in the same transaction with the
> data) and initialize the custom sink with right batch id when application
> re-starts. After this just ignore batch if current batchId <=
> latestBatchId.
>
> Dmitry
>
>
> 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>
>> I think the right way to look at this is the batchId is just a proxy for
>> offsets that is agnostic to what type of source you are reading from (or
>> how many sources their are).  We might call into a custom sink with the
>> same batchId more than once, but it will always contain the same data
>> (there is no race condition, since this is stored in a write-ahead log).
>> As long as you check/commit the batch id in the same transaction as the
>> data you will get exactly once.
>>
>> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>> wrote:
>>
>>> Thanks, I see.
>>>
>>> However, I guess reading from checkpoint directory might be less
>>> efficient comparing just preserving offsets in Dataset.
>>>
>>> I have one more question about operation idempotence (hope it help
>>> others to have a clear picture).
>>>
>>> If I read offsets on re-start from RDBMS and manually specify starting
>>> offsets on Kafka Source, is it still possible that in case of any failure I
>>> got a situation where the duplicate batch id will go to a Custom Sink?
>>>
>>> Previously on DStream, you will just read offsets from storage on start
>>> and just write them into DB in one transaction with data and it's was
>>> enough for "exactly-once". Please, correct me if I made a mistake here. So
>>> does the same strategy will work with Structured Streaming?
>>>
>>> I guess, that in case of Structured Streaming, Spark will commit batch
>>> offset to a checkpoint directory and there can be a race condition where
>>> you can commit your data with offsets into DB, but Spark will fail to
>>> commit the batch id, and some kind of automatic retry happen. If this is
>>> true, is it possible to disable this automatic re-try, so I can still use
>>> unified API for batch/streaming with my own re-try logic (which is
>>> basically, just ignore intermediate data, re-read from Kafka and re-try
>>> processing and load)?
>>>
>>> Dmitry
>>>
>>>
>>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>
>>>> In the checkpoint directory there is a file /offsets/$batchId that
>>>> holds the offsets serialized as JSON.  I would not consider this a public
>>>> stable API though.
>>>>
>>>> Really the only important thing to get exactly once is that you must
>>>> ensure whatever operation you are doing downstream is idempotent with
>>>> respect to the batchId.  For example, if you are writing to an RDBMS you
>>>> could have a table that records the batch ID and update that in the same
>>>> transaction as you append the results of the batch.  Before trying to
>>>> append you should check that batch ID and make sure you have not already
>>>> committed.
>>>>
>>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <
>>>> dm.naume...@gmail.com> wrote:
>>>>
>>>>> Thanks for response, Michael
>>>>>
>>>>> >  You should still be able to get exactly once processing by using
>>>>> the batchId that is passed to the Sink.
>>>>>
>>>>> Could you explain this in more detail, please? Is there some kind of
>>>>> offset manager API that works as get-offset by batch id lookup table?
>>>>>
>>>>> Dmitry
>>>>>
>>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>>>
>>>>>> I think that we are going to have to change the Sink API as part of
>>>>>> SPARK-20

Re: Easy way to get offset metatada with Spark Streaming API

2017-09-13 Thread Michael Armbrust
I think the right way to look at this is the batchId is just a proxy for
offsets that is agnostic to what type of source you are reading from (or
how many sources their are).  We might call into a custom sink with the
same batchId more than once, but it will always contain the same data
(there is no race condition, since this is stored in a write-ahead log).
As long as you check/commit the batch id in the same transaction as the
data you will get exactly once.

On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com>
wrote:

> Thanks, I see.
>
> However, I guess reading from checkpoint directory might be less efficient
> comparing just preserving offsets in Dataset.
>
> I have one more question about operation idempotence (hope it help others
> to have a clear picture).
>
> If I read offsets on re-start from RDBMS and manually specify starting
> offsets on Kafka Source, is it still possible that in case of any failure I
> got a situation where the duplicate batch id will go to a Custom Sink?
>
> Previously on DStream, you will just read offsets from storage on start
> and just write them into DB in one transaction with data and it's was
> enough for "exactly-once". Please, correct me if I made a mistake here. So
> does the same strategy will work with Structured Streaming?
>
> I guess, that in case of Structured Streaming, Spark will commit batch
> offset to a checkpoint directory and there can be a race condition where
> you can commit your data with offsets into DB, but Spark will fail to
> commit the batch id, and some kind of automatic retry happen. If this is
> true, is it possible to disable this automatic re-try, so I can still use
> unified API for batch/streaming with my own re-try logic (which is
> basically, just ignore intermediate data, re-read from Kafka and re-try
> processing and load)?
>
> Dmitry
>
>
> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>
>> In the checkpoint directory there is a file /offsets/$batchId that holds
>> the offsets serialized as JSON.  I would not consider this a public stable
>> API though.
>>
>> Really the only important thing to get exactly once is that you must
>> ensure whatever operation you are doing downstream is idempotent with
>> respect to the batchId.  For example, if you are writing to an RDBMS you
>> could have a table that records the batch ID and update that in the same
>> transaction as you append the results of the batch.  Before trying to
>> append you should check that batch ID and make sure you have not already
>> committed.
>>
>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>> wrote:
>>
>>> Thanks for response, Michael
>>>
>>> >  You should still be able to get exactly once processing by using the 
>>> > batchId
>>> that is passed to the Sink.
>>>
>>> Could you explain this in more detail, please? Is there some kind of
>>> offset manager API that works as get-offset by batch id lookup table?
>>>
>>> Dmitry
>>>
>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>>>
>>>> I think that we are going to have to change the Sink API as part of
>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>>>> which is why I linked these tickets together.  I'm still targeting an
>>>> initial version for Spark 2.3 which should happen sometime towards the end
>>>> of the year.
>>>>
>>>> There are some misconceptions in that stack overflow answer that I can
>>>> correct.  Until we improve the Source API, You should still be able to get
>>>> exactly once processing by using the batchId that is passed to the Sink.
>>>> We guarantee that the offsets present at any given batch ID will be the
>>>> same across retries by recording this information in the checkpoint's WAL.
>>>> The checkpoint does not use java serialization (like DStreams does) and can
>>>> be used even after upgrading Spark.
>>>>
>>>>
>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <
>>>> dm.naume...@gmail.com> wrote:
>>>>
>>>>> Thanks, Cody
>>>>>
>>>>> Unfortunately, it seems to be there is no active development right
>>>>> now. Maybe I can step in and help with it somehow?
>>>>>
>>>>> Dmitry
>>>>>
>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>>>
>>>&g

Re: Easy way to get offset metatada with Spark Streaming API

2017-09-12 Thread Michael Armbrust
In the checkpoint directory there is a file /offsets/$batchId that holds
the offsets serialized as JSON.  I would not consider this a public stable
API though.

Really the only important thing to get exactly once is that you must ensure
whatever operation you are doing downstream is idempotent with respect to
the batchId.  For example, if you are writing to an RDBMS you could have a
table that records the batch ID and update that in the same transaction as
you append the results of the batch.  Before trying to append you should
check that batch ID and make sure you have not already committed.

On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko <dm.naume...@gmail.com>
wrote:

> Thanks for response, Michael
>
> >  You should still be able to get exactly once processing by using the 
> > batchId
> that is passed to the Sink.
>
> Could you explain this in more detail, please? Is there some kind of
> offset manager API that works as get-offset by batch id lookup table?
>
> Dmitry
>
> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>:
>
>> I think that we are going to have to change the Sink API as part of
>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>,
>> which is why I linked these tickets together.  I'm still targeting an
>> initial version for Spark 2.3 which should happen sometime towards the end
>> of the year.
>>
>> There are some misconceptions in that stack overflow answer that I can
>> correct.  Until we improve the Source API, You should still be able to get
>> exactly once processing by using the batchId that is passed to the Sink.
>> We guarantee that the offsets present at any given batch ID will be the
>> same across retries by recording this information in the checkpoint's WAL.
>> The checkpoint does not use java serialization (like DStreams does) and can
>> be used even after upgrading Spark.
>>
>>
>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>> wrote:
>>
>>> Thanks, Cody
>>>
>>> Unfortunately, it seems to be there is no active development right now.
>>> Maybe I can step in and help with it somehow?
>>>
>>> Dmitry
>>>
>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>:
>>>
>>>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>>>
>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko <dm.naume...@gmail.com>
>>>> wrote:
>>>> > Hi all,
>>>> >
>>>> > It started as a discussion in
>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>>>> a-offsets-with-spark-structured-streaming-api.
>>>> >
>>>> > So the problem that there is no support in Public API to obtain the
>>>> Kafka
>>>> > (or Kineses) offsets. For example, if you want to save offsets in
>>>> external
>>>> > storage in Custom Sink, you should :
>>>> > 1) preserve topic, partition and offset across all transform
>>>> operations of
>>>> > Dataset (based on hard-coded Kafka schema)
>>>> > 2) make a manual group by partition/offset with aggregate max offset
>>>> >
>>>> > Structured Streaming doc says "Every streaming source is assumed to
>>>> have
>>>> > offsets", so why it's not a part of Public API? What do you think
>>>> about
>>>> > supporting it?
>>>> >
>>>> > Dmitry
>>>>
>>>
>>>
>>
>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread Michael Armbrust
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:

> The spark version is 2.2.0
>
> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>
>> Which version of spark?
>>
>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Thanks for reply, but using this method I got an exception:
>>>
>>> "Exception in thread "main" 
>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>> nondeterministic expressions are only allowed in
>>>
>>> Project, Filter, Aggregate or Window"
>>>
>>> Can you give more advice?
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>
>>>> import org.apache.spark.sql.functions._
>>>>
>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>
>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> In structured streaming how can I add a column to a dataset with
>>>>> current system time aligned with 15 minutes?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread Michael Armbrust
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:

> Hi,
>
> I'm using structured streaming to count unique visits of our website. I
> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
> memory to 4 cores * 10g memory for each executor, but there are frequent
> full gc, and once the count raises to about more than 4.5 millions the
> application will be blocked and finally crash in OOM. It's kind of
> unreasonable. So is there any suggestion to optimize the memory consumption
> of SS? Thanks.
>


Re: Easy way to get offset metatada with Spark Streaming API

2017-09-12 Thread Michael Armbrust
I think that we are going to have to change the Sink API as part of
SPARK-20928 , which
is why I linked these tickets together.  I'm still targeting an initial
version for Spark 2.3 which should happen sometime towards the end of the
year.

There are some misconceptions in that stack overflow answer that I can
correct.  Until we improve the Source API, You should still be able to get
exactly once processing by using the batchId that is passed to the Sink. We
guarantee that the offsets present at any given batch ID will be the same
across retries by recording this information in the checkpoint's WAL. The
checkpoint does not use java serialization (like DStreams does) and can be
used even after upgrading Spark.


On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko 
wrote:

> Thanks, Cody
>
> Unfortunately, it seems to be there is no active development right now.
> Maybe I can step in and help with it somehow?
>
> Dmitry
>
> 2017-09-11 21:01 GMT+03:00 Cody Koeninger :
>
>> https://issues-test.apache.org/jira/browse/SPARK-18258
>>
>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko 
>> wrote:
>> > Hi all,
>> >
>> > It started as a discussion in
>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk
>> a-offsets-with-spark-structured-streaming-api.
>> >
>> > So the problem that there is no support in Public API to obtain the
>> Kafka
>> > (or Kineses) offsets. For example, if you want to save offsets in
>> external
>> > storage in Custom Sink, you should :
>> > 1) preserve topic, partition and offset across all transform operations
>> of
>> > Dataset (based on hard-coded Kafka schema)
>> > 2) make a manual group by partition/offset with aggregate max offset
>> >
>> > Structured Streaming doc says "Every streaming source is assumed to have
>> > offsets", so why it's not a part of Public API? What do you think about
>> > supporting it?
>> >
>> > Dmitry
>>
>
>


Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> Thanks for reply, but using this method I got an exception:
>
> "Exception in thread "main" 
> org.apache.spark.sql.streaming.StreamingQueryException:
> nondeterministic expressions are only allowed in
>
> Project, Filter, Aggregate or Window"
>
> Can you give more advice?
>
> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>
>> import org.apache.spark.sql.functions._
>>
>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In structured streaming how can I add a column to a dataset with current
>>> system time aligned with 15 minutes?
>>>
>>> Thanks.
>>>
>>
>>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-11 Thread Michael Armbrust
The following will convert the whole row to JSON.

import org.apache.spark.sql.functions.*
df.select(to_json(struct(col("*"

On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:

> Thanks Ryan! In this case, I will have Dataset so is there a way to
> convert Row to Json string?
>
> Thanks
>
> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> It's because "toJSON" doesn't support Structured Streaming. The current
>> implementation will convert the Dataset to an RDD, which is not supported
>> by streaming queries.
>>
>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:
>>
>>> yes it is a streaming dataset. so what is the problem with following
>>> code?
>>>
>>> Dataset ds = dataset.toJSON().map(()->{some function that returns a 
>>> string});
>>>  StreamingQuery query = ds.writeStream().start();
>>>  query.awaitTermination();
>>>
>>>
>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
>>> wrote:
>>>
 What is newDS?
 If it is a Streaming Dataset/DataFrame (since you have writeStream
 there) then there seems to be an issue preventing toJSON to work.

 --
 *From:* kant kodali 
 *Sent:* Saturday, September 9, 2017 4:04:33 PM
 *To:* user @spark
 *Subject:* Queries with streaming sources must be executed with
 writeStream.start()

 Hi All,

 I  have the following code and I am not sure what's wrong with it? I
 cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
 2.2.0 so I am wondering if there is any work around?

  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
 string});
  StreamingQuery query = ds.writeStream().start();
  query.awaitTermination();


>>>
>>
>


Re: Need some Clarification on checkpointing w.r.t Spark Structured Streaming

2017-09-11 Thread Michael Armbrust
Checkpoints record what has been processed for a specific query, and as
such only need to be defined when writing (which is how you "start" a
query).

You can use the DataFrame created with readStream to start multiple
queries, so it wouldn't really make sense to have a single checkpoint there.

On Mon, Sep 11, 2017 at 2:36 AM, kant kodali  wrote:

> Hi All,
>
> I was wondering if we need to checkpoint both read and write streams when
> reading from Kafka and inserting into a target store?
>
> for example
>
> sparkSession.readStream().option("checkpointLocation", "hdfsPath").load()
>
> vs
>
> dataSet.writeStream().option("checkpointLocation", "hdfsPath")
>
> Thanks!
>


Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:

> Hi,
>
> In structured streaming how can I add a column to a dataset with current
> system time aligned with 15 minutes?
>
> Thanks.
>


Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path

2017-09-07 Thread Michael Armbrust
+1

On Thu, Sep 7, 2017 at 9:32 AM, Ryan Blue  wrote:

> +1 (non-binding)
>
> Thanks for making the updates reflected in the current PR. It would be
> great to see the doc updated before it is finally published though.
>
> Right now it feels like this SPIP is focused more on getting the basics
> right for what many datasources are already doing in API V1 combined with
> other private APIs, vs pushing forward state of the art for performance.
>
> I think that’s the right approach for this SPIP. We can add the support
> you’re talking about later with a more specific plan that doesn’t block
> fixing the problems that this addresses.
> ​
>
> On Thu, Sep 7, 2017 at 2:00 AM, Herman van Hövell tot Westerflier <
> hvanhov...@databricks.com> wrote:
>
>> +1 (binding)
>>
>> I personally believe that there is quite a big difference between having
>> a generic data source interface with a low surface area and pushing down a
>> significant part of query processing into a datasource. The later has much
>> wider wider surface area and will require us to stabilize most of the
>> internal catalyst API's which will be a significant burden on the community
>> to maintain and has the potential to slow development velocity
>> significantly. If you want to write such integrations then you should be
>> prepared to work with catalyst internals and own up to the fact that things
>> might change across minor versions (and in some cases even maintenance
>> releases). If you are willing to go down that road, then your best bet is
>> to use the already existing spark session extensions which will allow you
>> to write such integrations and can be used as an `escape hatch`.
>>
>>
>> On Thu, Sep 7, 2017 at 10:23 AM, Andrew Ash  wrote:
>>
>>> +0 (non-binding)
>>>
>>> I think there are benefits to unifying all the Spark-internal
>>> datasources into a common public API for sure.  It will serve as a forcing
>>> function to ensure that those internal datasources aren't advantaged vs
>>> datasources developed externally as plugins to Spark, and that all Spark
>>> features are available to all datasources.
>>>
>>> But I also think this read-path proposal avoids the more difficult
>>> questions around how to continue pushing datasource performance forwards.
>>> James Baker (my colleague) had a number of questions about advanced
>>> pushdowns (combined sorting and filtering), and Reynold also noted that
>>> pushdown of aggregates and joins are desirable on longer timeframes as
>>> well.  The Spark community saw similar requests, for aggregate pushdown in
>>> SPARK-12686, join pushdown in SPARK-20259, and arbitrary plan pushdown
>>> in SPARK-12449.  Clearly a number of people are interested in this kind of
>>> performance work for datasources.
>>>
>>> To leave enough space for datasource developers to continue
>>> experimenting with advanced interactions between Spark and their
>>> datasources, I'd propose we leave some sort of escape valve that enables
>>> these datasources to keep pushing the boundaries without forking Spark.
>>> Possibly that looks like an additional unsupported/unstable interface that
>>> pushes down an entire (unstable API) logical plan, which is expected to
>>> break API on every release.   (Spark attempts this full-plan pushdown, and
>>> if that fails Spark ignores it and continues on with the rest of the V2 API
>>> for compatibility).  Or maybe it looks like something else that we don't
>>> know of yet.  Possibly this falls outside of the desired goals for the V2
>>> API and instead should be a separate SPIP.
>>>
>>> If we had a plan for this kind of escape valve for advanced datasource
>>> developers I'd be an unequivocal +1.  Right now it feels like this SPIP is
>>> focused more on getting the basics right for what many datasources are
>>> already doing in API V1 combined with other private APIs, vs pushing
>>> forward state of the art for performance.
>>>
>>> Andrew
>>>
>>> On Wed, Sep 6, 2017 at 10:56 PM, Suresh Thalamati <
>>> suresh.thalam...@gmail.com> wrote:
>>>
 +1 (non-binding)


 On Sep 6, 2017, at 7:29 PM, Wenchen Fan  wrote:

 Hi all,

 In the previous discussion, we decided to split the read and write path
 of data source v2 into 2 SPIPs, and I'm sending this email to call a vote
 for Data Source V2 read path only.

 The full document of the Data Source API V2 is:
 https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ
 -Z8qU5Frf6WMQZ6jJVM/edit

 The ready-for-review PR that implements the basic infrastructure for
 the read path is:
 https://github.com/apache/spark/pull/19136

 The vote will be up for the next 72 hours. Please reply with your vote:

 +1: Yeah, let's go forward and implement the SPIP.
 +0: Don't really care.
 -1: I don't think this is a good idea because of the following
 technical reasons.

 Thanks!



[jira] [Comment Edited] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-08-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148227#comment-16148227
 ] 

Michael Armbrust edited comment on SPARK-20928 at 8/30/17 11:52 PM:


Hey everyone, thanks for your interest in this feature!  I'm still targeting 
Spark 2.3, but unfortunately have been busy with other things since the summit. 
Will post more details on the design as soon as we have them! The Spark summit 
demo just showed a hacked-together prototype, but we need to do more to figure 
out how to best integrate it into Spark.


was (Author: marmbrus):
Hey everyone, thanks for your interest in this feature!  I'm still targeting 
Spark 2.3, but unfortunately have been busy with other things since the summit. 
Will post more details on the design as soon as we have them!

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-08-30 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148227#comment-16148227
 ] 

Michael Armbrust commented on SPARK-20928:
--

Hey everyone, thanks for your interest in this feature!  I'm still targeting 
Spark 2.3, but unfortunately have been busy with other things since the summit. 
Will post more details on the design as soon as we have them!

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Increase Timeout or optimize Spark UT?

2017-08-23 Thread Michael Armbrust
I think we already set the number of partitions to 5 in tests

?

On Tue, Aug 22, 2017 at 3:25 PM, Maciej Szymkiewicz 
wrote:

> Hi,
>
> From my experience it is possible to cut quite a lot by reducing
> spark.sql.shuffle.partitions to some reasonable value (let's say
> comparable to the number of cores). 200 is a serious overkill for most of
> the test cases anyway.
>
>
> Best,
> Maciej
>
>
>
> On 21 August 2017 at 03:00, Dong Joon Hyun  wrote:
>
>> +1 for any efforts to recover Jenkins!
>>
>>
>>
>> Thank you for the direction.
>>
>>
>>
>> Bests,
>>
>> Dongjoon.
>>
>>
>>
>> *From: *Reynold Xin 
>> *Date: *Sunday, August 20, 2017 at 5:53 PM
>> *To: *Dong Joon Hyun 
>> *Cc: *"dev@spark.apache.org" 
>> *Subject: *Re: Increase Timeout or optimize Spark UT?
>>
>>
>>
>> It seems like it's time to look into how to cut down some of the test
>> runtimes. Test runtimes will slowly go up given the way development
>> happens. 3 hr is already a very long time for tests to run.
>>
>>
>>
>>
>>
>> On Sun, Aug 20, 2017 at 5:45 PM, Dong Joon Hyun 
>> wrote:
>>
>> Hi, All.
>>
>>
>>
>> Recently, Apache Spark master branch test (SBT with hadoop-2.7 / 2.6) has
>> been hitting the build timeout.
>>
>>
>>
>> Please see the build time trend.
>>
>>
>>
>> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Tes
>> t%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/buildTimeTrend
>>
>>
>>
>> All recent 22 builds fail due to timeout directly/indirectly. The last
>> success (SBT with Hadoop-2.7) is 15th August.
>>
>>
>>
>> We may do the followings.
>>
>>
>>
>>1. Increase Build Timeout (3 hr 30 min)
>>2. Optimize UTs (Scala/Java/Python/UT)
>>
>>
>>
>> But, Option 1 will be the immediate solution for now . Could you update
>> the Jenkins setup?
>>
>>
>>
>> Bests,
>>
>> Dongjoon.
>>
>>
>>
>
>


Re: Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-23 Thread Michael Armbrust
You can create a nested struct that contains multiple columns using
struct().

Here's a pretty complete guide on working with nested data:
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

On Wed, Aug 23, 2017 at 2:30 PM, JG Perrin  wrote:

> Hi folks,
>
>
>
> I am trying to join 2 dataframes, but I would like to have the result as a
> list of rows of the right dataframe (dDf in the example) in a column of the
> left dataframe (cDf in the example). I made it work with *one column*,
> but having issues adding more columns/creating a row(?).
>
> Seq joinColumns = new Set2<>("c1", "c2").toSeq();
>
> Dataset allDf = cDf.join(dDf, joinColumns, "inner");
>
> allDf.printSchema();
>
> allDf.show();
>
>
>
> Dataset aggDf = allDf.groupBy(cDf.col("c1"), cDf.col("c2"))
>
> .agg(collect_list(col("c50")));
>
> aggDf.show();
>
>
>
> Output:
>
> ++---+---+
>
> |c1  |c2 |collect_list(c50)  |
>
> ++---+---+
>
> |3744|1160242| [6, 5, 4, 3, 2, 1]|
>
> |3739|1150097|[1]|
>
> |3780|1159902|[5, 4, 3, 2, 1]|
>
> | 132|1200743|   [4, 3, 2, 1]|
>
> |3778|1183204|[1]|
>
> |3766|1132709|[1]|
>
> |3835|1146169|[1]|
>
> ++---+---+
>
>
>
> Thanks,
>
>
>
> jg
>
>
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


  1   2   3   4   5   6   7   8   9   10   >