Re: Unmarking most things as experimental, evolving for 3.0?

2019-08-21 Thread Dongjoon Hyun
+1 for unmarking old ones (made in `2.3.x` and before).
Thank you, Sean.

Bests,
Dongjoon.

On Wed, Aug 21, 2019 at 6:46 PM Sean Owen  wrote:

> There are currently about 130 things marked as 'experimental' in
> Spark, and some have been around since Spark 1.x. A few may be
> legitimately still experimental (e.g. barrier mode), but, would it be
> safe to say most of these annotations should be removed for 3.0?
>
> What's the theory for evolving vs experimental -- would almost all of
> these items from, say, 2.3 and before be considered stable now, de
> facto? Meaning, if we wouldn't take a breaking change for them after
> 3.0, seems like they're stable.
>
> I can open a PR that removes most of it and see if anything looks
> wrong, if that's an easy way forward.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Unmarking most things as experimental, evolving for 3.0?

2019-08-21 Thread Sean Owen
There are currently about 130 things marked as 'experimental' in
Spark, and some have been around since Spark 1.x. A few may be
legitimately still experimental (e.g. barrier mode), but, would it be
safe to say most of these annotations should be removed for 3.0?

What's the theory for evolving vs experimental -- would almost all of
these items from, say, 2.3 and before be considered stable now, de
facto? Meaning, if we wouldn't take a breaking change for them after
3.0, seems like they're stable.

I can open a PR that removes most of it and see if anything looks
wrong, if that's an easy way forward.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Fwd: Custom aggregations: modular and lightweight solutions?

2019-08-21 Thread Andrew Leverentz
Hi All,

Apologies for cross-posting this, but I'm wondering if the dev list might
be a better place for my questions below.  For now, I'm developing set of
utilities for my own use, but if I can get these utilities working, I'd
like to see if it might be worth contributing them to the Spark project.

To summarize, I'm hoping to come up with a cleaner & more
functional-programming oriented way of defining custom grouping
calculations on Datasets / DataFrames, as described in my first email, from
August 12.  My second email (from Aug 13) provides a smaller and more
self-contained example that I think illustrates the core stumbling block
I'm running into.

Thanks,
~ Andrew


-- Forwarded message -
From: Andrew Leverentz 
Date: Tue, Aug 13, 2019 at 12:59 PM
Subject: Re: Custom aggregations: modular and lightweight solutions?
To: 


Here's a simpler example that I think gets at the heart of what I'm trying
to do: DynamicSchemaExample.scala
.  Here,
I'm dynamically creating a sequence of Rows and also dynamically creating a
corresponding schema (StructType), but the RowEncoder derived from the
schema doesn't seem to handle the nested structure of the Rows.  This
example fails with a similar error (in this case, "scala.Tuple2$mcDD$sp is
not a valid external type for schema of struct<_1:double,_2:double>").

If I could find a way to get this example working (for arbitrary values of
rowSize), I suspect that it would also give me a solution to the
custom-aggregation issue I outlined in my previous email.  Any suggestions
would be much appreciated.

Thanks,
~ Andrew



On Mon, Aug 12, 2019 at 5:31 PM Andrew Leverentz <
andrew.levere...@empiricotx.com> wrote:

> Hi All,
>
> I'm attempting to clean up some Spark code which performs groupByKey /
> mapGroups to compute custom aggregations, and I could use some help
> understanding the Spark API's necessary to make my code more modular and
> maintainable.
>
> In particular, my current approach is as follows:
>
>- Start with a Dataset[CaseClass1]
>- Apply groupByKey(f), where f is a function that extracts a tuple of
>keys
>- Apply mapGroups(g), where g computes multiple custom aggregations:
>   - Iterate through the rows in each group, updating a large, mutable
>   CustomState object.
>   - At the end of each group, transform the current key and the
>   CustomState into an instance of CaseClass2.
>
> In other words, we start with a dataset of CaseClass1 objects and end up
> with a dataset of CaseClass2 objects, using instances of a complex
> CustomState class to store the intermediate state during the aggregation.
>
> We have dozens of custom aggregation calculations to perform on this data,
> and I'd like to be able streamline the process of introducing new
> aggregations and comparing multiple parameterized variations of the same
> aggregations side-by-side.  The current approach requires us to touch
> several tightly coupled pieces of code in order to add simple variations to
> existing aggregate functions.
>
> The UDAF API seems to be designed for this use case, but I've found it to
> be just as cumbersome to create new UDAF's as it is to maintain my current
> code.
>
> To address this, I've tried a couple of approaches (described below),
> although I've run into problems with both of them.
>
> At a high level, both of my approaches require a Dataset[T], a key
> extractor function (T => K), and a collection of instances of a custom
> class GroupingCalculation[T, S, R].  Here, T is the data type of each row
> in the dataset, K is the type of the key by which the rows should be
> grouped, S is the type of the intermediate state during aggregation, and R
> is the result type of each aggregation.  In this context, the data types T
> and K are fixed, but the state and result types (S and R) may vary among
> the GroupingCalculation instances.  The resulting DataFrame will have Rows
> which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ...,
> Rn are the result types for the GroupingCollection instances.
>
> (1) My first approach operates by constructing a
> UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to
> T, S, and R.  After some digging and experimentation, I found a way to use
> CatalystTypeConverters and ExpressionEncoders to populate the
> MutableAggregationBuffer.  Unfortunately, once I finally got it running,
> this approach yielded a runtime 10x slower than the original approach
> described above. I suspect that adding an extra encoding/decoding layer on
> top of the UDAF was what slowed it down.  Because of this, I'm setting
> aside this approach for now.
>
> (2) Using a similar API to (1), I replaced the implementation with one
> that uses groupByKey and mapGroups.  This bypasses the need for creating a
> wrapper around UDAF.  Also, the internal state, rather than being encoded
> in a DataFrame, is simply sto

Re: Data Property Accumulators

2019-08-21 Thread Erik Erlandson
I'm wondering whether keeping track of accumulation in "consistent mode" is
like a case for mapping straight to the Try value, so parsedData has type
RDD[Try[...]], and counting failures is
parsedData.filter(_.isFailure).count, etc

Put another way: Consistent mode accumulation seems (to me) like it is
trying to obey spark's RDD compute model, contrasted with legacy
accumulators which subvert that model. I think the fact that your "option
3" is sending information about accumulators down through mapping function
api, as well as passing through an Option" stage, is also hinting at that
idea.

That might mean the idiomatic way to do consistent mode is via the existing
spark API, and using constructs like Try, Either, Option, Tuple, or just a
new column carrying additional accumulator channels.


On Fri, Aug 16, 2019 at 5:48 PM Holden Karau  wrote:

> Are folks interested in seeing data property accumulators for RDDs? I made
> a proposal for this back in Spark 2016 (
> https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit
>  ) but
> ABI compatibility was a stumbling block I couldn't design around. I can
> look at reviving it for Spark 3 or just go ahead and close out this idea.
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Please help the question of repartition for dataset from partiitoned hive table

2019-08-21 Thread zhangliyun




Hi All:
   i have a question about repartition api and sparksql partition. I have an 
table which partition key is day
```
./bin/spark-sql -e "CREATE TABLE t_original_partitioned_spark (cust_id int, 
loss double) PARTITIONED BY (day STRING) location 
'hdfs://localhost:9000/t_original_partitioned_spark'"


```
insert serveral data and now there are 2 partitions as two days ( 2019-05-30 
and 2019-05-20)
```
sqlContext.sql("insert into  t_original_partitioned_spark values 
(30,'0.3','2019-05-30'))
sqlContext.sql("insert into  t_original_partitioned_spark values 
(20,'0.2','2019-05-20'))




```


now i want to repartition the data to 1 partition as in actual case there maybe 
too much partitions ,i want to make fewer partitions.


I call repartition api and overwrite the the table. i hope now there is 1 
partition but actually there are two partitions when query by "show partitions 
default.t_original_partitioned_spark"
```


 val df = sqlContext.sql("select * from t_original_partitioned_spark")
df1=df.repartition(1)
df1.write.mode(org.apache.spark.sql.SaveMode.Overwrite).format("seq").insertInto(s"default.t_original_partitioned_spark")


```


my question is the actually partition number is decided by the num of 
repartition($num) or the hive table partitions if i use both of them?
Best Regards
Kelly Zhang