Re: Incorrect CAST to TIMESTAMP in Hive compatibility

2017-06-05 Thread Anton Okolnychyi
Hi,

I also noticed this issue. Actually, it was already mentioned several
times. There is an existing JIRA(SPARK-17914).

I am going to submit a PR to fix this in a few days.

Best,
Anton

On Jun 5, 2017 21:42, "verbamour"  wrote:

> Greetings,
>
> I am using Hive compatibility in Spark 2.1.1 and it appears that the CAST
> string to TIMESTAMP improperly trims the sub-second value. In particular,
> leading zeros in the decimal portion appear to be dropped.
>
> Steps to reproduce:
> 1. From `spark-shell` issue: `spark.sql("SELECT CAST('2017-04-05
> 16:00:48.0297580' AS TIMESTAMP)").show(100, false)`
>
> 2. Note erroneous result (i.e. ".0297580" becomes ".29758")
> ```
> +--+
> |CAST(2017-04-05 16:00:48.0297580 AS TIMESTAMP)|
> +--+
> |2017-04-05 16:00:48.29758 |
> +--+
> ```
>
> I am not currently plugged into the JIRA system for Spark, so if this is
> truly a bug please bring it to the attention of the appropriate
> authorities.
>
> Cheers,
>  -tom
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Incorrect-CAST-to-TIMESTAMP-
> in-Hive-compatibility-tp28744.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Aggregator mutate b1 in place in merge

2017-01-29 Thread Anton Okolnychyi
Hi,

I recently extended the Spark SQL programming guide to cover user-defined
aggregations, where I modified existing variables and returned them back in
reduce and merge. This approach worked and it was approved by people who
know the context.

Hope that helps.

2017-01-29 17:17 GMT+01:00 Koert Kuipers :

> anyone?
> it not i will follow the trail and try to deduce it myself
>
> On Mon, Jan 23, 2017 at 2:31 PM, Koert Kuipers  wrote:
>
>> looking at the docs for org.apache.spark.sql.expressions.Aggregator it
>> says for reduce method: "For performance, the function may modify `b` and
>> return it instead of constructing new object for b.".
>>
>> it makes no such comment for the merge method.
>>
>> this is surprising to me because i know that for
>> PairRDDFunctions.aggregateByKey mutation is allowed in both seqOp and
>> combOp (which are the equivalents of reduce and merge in Aggregator).
>>
>> is it safe to mutate b1 and return it in Aggregator.merge?
>>
>>
>


Re: Spark Aggregator for array of doubles

2017-01-04 Thread Anton Okolnychyi
Hi,

take a look at this pull request that is not merged yet:
https://github.com/apache/spark/pull/16329 . It contains examples in Java
and Scala that can be helpful.

Best regards,
Anton Okolnychyi

On Jan 4, 2017 23:23, "Anil Langote" <anillangote0...@gmail.com> wrote:

> Hi All,
>
> I have been working on a use case where I have a DF which has 25 columns,
> 24 columns are of type string and last column is array of doubles. For a
> given set of columns I have to apply group by and add the array of doubles,
> I have implemented UDAF which works fine but it's expensive in order to
> tune the solution I came across Aggregators which can be implemented and
> used with agg function, my question is how can we implement a aggregator
> which takes array of doubles as input and returns the array of double.
>
> I learned that it's not possible to implement the aggregator in Java can
> be done in scala only how can define the aggregator which takes array of
> doubles as input, note that I have parquet file as my input.
>
> Any pointers are highly appreciated, I read that spark UDAF is slow and
> aggregators are the way to go.
>
> Best Regards,
>
> Anil Langote
>
> +1-425-633-9747
>


Re: Spark Streaming with Kafka

2016-12-12 Thread Anton Okolnychyi
thanks for all your replies, now I have a complete picture.



2016-12-12 16:49 GMT+01:00 Cody Koeninger <c...@koeninger.org>:

> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#creating-a-direct-stream
>
> Use a separate group id for each stream, like the docs say.
>
> If you're doing multiple output operations, and aren't caching, spark
> is going to read from kafka again each time, and if some of those
> reads are happening for the same group and same topicpartition, it's
> not going to work.
>
> On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
> <oleksii.duk...@gmail.com> wrote:
> > Hi Anton,
> >
> > What is the command you run your spark app with? Why not working with
> data
> > instead of stream on your second stage operation? Can you provide logs
> with
> > the issue?
> >
> > ConcurrentModificationException is not a spark issue, it means that you
> use
> > the same Kafka consumer instance from more than one thread.
> >
> > Additionally,
> >
> > 1) As I understand new kafka consumer is created every time when you call
> > KafkaUtils.createDirectStream.
> > 2) If you assign the same group id to several consumer instances then all
> > the consumers will get different set of messages on the same topic. This
> is
> > a kind of load balancing which kafka provides with its Consumer API.
> >
> > Oleksii
> >
> > On 11 December 2016 at 18:46, Anton Okolnychyi <
> anton.okolnyc...@gmail.com>
> > wrote:
> >>
> >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
> >> nothing custom.
> >>
> >>
> >> I will try restate the initial question. Let's consider an example.
> >>
> >> 1. I create a stream and subscribe to a certain topic.
> >>
> >> val stream = KafkaUtils.createDirectStream(...)
> >>
> >> 2. I extract the actual data from the stream. For instance, word counts.
> >>
> >> val wordCounts = stream.map(record => (record.value(), 1))
> >>
> >> 3. Then I compute something and output the result to console.
> >>
> >> val firstResult = stream.reduceByWindow(...)
> >> firstResult.print()
> >>
> >> Once that is done, I would like to perform another computation on top of
> >> wordCounts and output that result again to console. In my current
> >> understanding, I cannot just reuse wordCounts from Step 2 and should
> create
> >> a new stream with another group id and then define the second
> computation.
> >> Am I correct that if add the next part, then I can get
> >> "ConcurrentModificationException: KafkaConsumer is not safe for
> >> multi-threaded access"?
> >>
> >> // another computation on wordCounts
> >> val secondResult = wordCounts.reduceByKeyAndWindow(...)
> >> secondResult.output()
> >>
> >> Thanks,
> >> Anton
> >>
> >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:
> >>>
> >>> Hi,
> >>> Usual general questions are:
> >>> -- what is your Spark version?
> >>> -- what is your Kafka version?
> >>> -- do you use "standard" Kafka consumer or try to implement something
> >>> custom (your own multi-threaded consumer)?
> >>>
> >>> The freshest docs
> >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html
> >>>
> >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
> >>> !!!)
> >>>>
> >>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_
> each_stream");
> >>>
> >>>
> >>>
> >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
> >>> <anton.okolnyc...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> I am experimenting with Spark Streaming and Kafka. I will appreciate
> if
> >>>> someone can say whether the following assumption is correct.
> >>>>
> >>>> If I have multiple computations (each with its own output) on one
> stream
> >>>> (created as KafkaUtils.createDirectStream), then there is a chance
> to have
> >>>> ConcurrentModificationException: KafkaConsumer is not safe for
> >>>> multi-threaded access.  To solve this problem, I should create a new
> stream
> >>>> with different "group.id" for each computation.
> >>>>
> >>>> Am I right?
> >>>>
> >>>> Best regards,
> >>>> Anton
> >>>
> >>>
> >>
> >
>


Re: Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
nothing custom.


I will try restate the initial question. Let's consider an example.

1. I create a stream and subscribe to a certain topic.

val stream = KafkaUtils.createDirectStream(...)

2. I extract the actual data from the stream. For instance, word counts.

val wordCounts = stream.map(record => (record.value(), 1))

3. Then I compute something and output the result to console.

val firstResult = stream.reduceByWindow(...)
firstResult.print()

Once that is done, I would like to perform another computation on top of
wordCounts and output that result again to console. In my current
understanding, I cannot just reuse wordCounts from Step 2 and should create
a new stream with another group id and then define the second computation.
Am I correct that if add the next part, then I can get "
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access"?

// another computation on wordCounts
val secondResult = wordCounts.reduceByKeyAndWindow(...)
secondResult.output()

Thanks,
Anton

2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:

> Hi,
> Usual general questions are:
> -- what is your Spark version?
> -- what is your Kafka version?
> -- do you use "standard" Kafka consumer or try to implement something
> custom (your own multi-threaded consumer)?
>
> The freshest docs https://spark.apache.org/docs/
> latest/streaming-kafka-0-10-integration.html
>
> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)
>
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>
>>
>
> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
> anton.okolnyc...@gmail.com> wrote:
>
>> Hi,
>>
>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>> someone can say whether the following assumption is correct.
>>
>> If I have multiple computations (each with its own output) on one stream
>> (created as KafkaUtils.createDirectStream), then there is a chance to
>> have ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access.  To solve this problem, I should create a new stream
>> with different "group.id" for each computation.
>>
>> Am I right?
>>
>> Best regards,
>> Anton
>>
>
>


Spark Streaming with Kafka

2016-12-11 Thread Anton Okolnychyi
Hi,

I am experimenting with Spark Streaming and Kafka. I will appreciate if
someone can say whether the following assumption is correct.

If I have multiple computations (each with its own output) on one stream
(created as KafkaUtils.createDirectStream), then there is a chance to have
ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access.  To solve this problem, I should create a new stream
with different "group.id" for each computation.

Am I right?

Best regards,
Anton


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Anton Okolnychyi
Hi guys,

I also experienced a situation when Spark 1.6.2 ignored my hint to do a
broadcast join (i.e. broadcast(df)) with a small dataset. However, this
happened only in 1 of 3 cases. Setting the
"spark.sql.autoBroadcastJoinThreshold" property did not have any impact as
well. All 3 cases work fine in Spark 2.0.

Is there any chance that Spark can neglect manually specified broadcast
operation? In other words, is Spark forced to perform a broadcast if one
specifies "df1.join(broadcast(df2), ...)"?

Best regards,
Anton



2016-11-26 21:04 GMT+01:00 Swapnil Shinde :

> I am using Spark 1.6.3 and below is the real plan (a,b,c in above were
> just for illustration purpose)
>
> == Physical Plan ==
> Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
> mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
> +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
> [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
>:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
>:  +- TungstenExchange 
> hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200),
> None
>: +- Project [_1#3797 AS ltt#3800,_2#3798 AS
> mr_demo_id#3801,_3#3799 AS mr_demoname#3802]
>:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
>+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
>   +- TungstenExchange 
> hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200),
> None
>  +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
> +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
> AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806
> AS etv_demo_id#3813]
>+- Filter ((map_type#3809 = master_roster_to_etv) && NOT
> (demogroup#3803 = gender_age_id))
>   +- Scan ExistingRDD[demogroup#3803,
> demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,
> demovalue_old_map#3808,map_type#3809]
>
>
> Thanks
> Swapnil
>
> On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang  wrote:
>
>> Could you post the result of explain `c.explain`? If it is broadcast
>> join, you will see it in explain.
>>
>> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <
>> swapnilushi...@gmail.com> wrote:
>>
>>> Hello
>>> I am trying a broadcast join on dataframes but it is still doing
>>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>>> higher but still no luck.
>>>
>>> Related piece of code-
>>>   val c = a.join(braodcast(b), "id")
>>>
>>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>>> around 4964 bytes.
>>> Help is very much appreciated!!
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Fwd:

2016-11-15 Thread Anton Okolnychyi
Hi,

I have experienced a problem using the Datasets API in Spark 1.6, while
almost identical code works fine in Spark 2.0.
The problem is related to encoders and custom aggregators.

*Spark 1.6 (the aggregation produces an empty map):*

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
.setAppName("IVU DS Spark 1.6 Test")
.setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap

In spark.sql.execution.TypedAggregateExpression.scala, I see that the
reduce is done correctly, but Spark cannot read the reduced values in the
merge phase.
If I replace the ExperessionEncoder with Kryo-based one (commented in the
presented code), then it works fine.

*Spark 2.0 (works correctly):*

  val spark = SparkSession
.builder()
.appName("IVU DS Spark 2.0 Test")
.config("spark.sql.warehouse.dir", "file:///D://sparkSql")
.master("local[4]")
.getOrCreate()

  import spark.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
2, "id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int,
String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
override def bufferEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
override def outputEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()
  }.toColumn

  val resultMap = stopPointDS
.groupByKey(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
I know that Spark 1.6 has only a preview of the Datasets concept and a lot
changed in 2.0. However, I would like to know if I am doing anything wrong
in my 1.6 code.

Thanks in advance,
Anton


Expression Encoder for Map[Int, String] in a custom Aggregator on a Dataset

2016-10-20 Thread Anton Okolnychyi
Hi all,

I am trying to use my custom Aggregator on a GroupedDataset of case classes
to create a hash map using Spark SQL 1.6.2.
My Encoder[Map[Int, String]] is not capable to reconstruct the reduced
values if I define it via ExpressionEncoder().
However, everything works fine if I define it as Encoders.kryo[Map[Int,
String]].
I would like to know if I am doing anything wrong.

I have the following use case:

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()

  val sparkContext = ...
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  case class StopPoint(line: String, sequenceNumber: Int, id: String)

  val stopPointDS = Seq(StopPoint("33", 1, "1"), StopPoint("33", 2,
"2")).toDS()

  val stopPointSequenceMap = new Aggregator[StopPoint, Map[Int, String],
Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: StopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
In spark.sql.execution.TypedAggregateExpression.scala, I see that each
entry is inserted into the initial map correctly (i.e. reduce() method
works properly).
However, my encoder cannot reconstruct the map from the reduce phase in the
merge phase and I get an empty Map as a result of the merge method.
If I replace my expression-based encoder with
org.apache.spark.sql.Encoders.kryo[Map[Int, String]], I will get the
correct result.
(33, Map(1 -> 1, 2 -> 2))

Any ideas/suggestions are more than welcome.

Sincerely,
Anton Okolnychyi


Re: Re: how to select first 50 value of each group after group by?

2016-07-07 Thread Anton Okolnychyi
Hi,

I can try to guess what is wrong, but I might be incorrect.

You should be careful with window frames (you define them via the
rowsBetween() method).

In my understanding, all window functions can be divided into 2 groups:
- functions defined by the
org.apache.spark.sql.catalyst.expressions.WindowFunction trait ("true"
window functions)
- all other supported functions that are marked as window functions by
providing a window specification.

The main distinction is that functions from the first group might have a
predefined internal frame. That's exactly your case.
Both row_number() and rank() functions are from the first group (i.e. they
have predefined internal frames).
To make your case work, you have 2 options:
- remove your own frame specification(i.e. rowsBetween(0, 49)) and use only
Window.partitionBy(hivetable.col("location"))
- state explictly correct window frames. For instance,
rowsBetween(Long.MinValue, 0) for rank() and row_number().

By the way, there is not too much documentation how Spark resolves window
frames. For that reason, I created a small pull request that can help:
https://github.com/apache/spark/pull/14050
It would be nice if anyone experienced can take a look at it since it is
based only on my own analysis.

2016-07-07 13:26 GMT+02:00 <luohui20...@sina.com>:

> hi Anton:
>   I check the docs you mentioned, and have code accordingly, however
> met an exception like "org.apache.spark.sql.AnalysisException: Window
> function row_number does not take a frame specification.;"
>   It Seems that the row_number API is giving a global row numbers of
> every row across all frames, by my understanding. If wrong,please correct
> me.
>   I checked all the window function API of
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$,
> and just found that maybe row_number() seems matches. I am not quit sure
> about it.
>
> here is my code:
> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
> val hivetable = hc.sql("select * from house_sale_pv_location")
> val overLocation =
> Window.partitionBy(hivetable.col("location")).rowsBetween(0, 49)
> val sortedDF = hivetable.withColumn("rowNumber",
> row_number().over(overLocation))
> sortedDF.registerTempTable("sortedDF")
> val top50 = hc.sql("select id,location from sortedDF where
> rowNumber<=50")
> top50.registerTempTable("top50")
> hc.sql("select * from top50 where
> location=30").collect.foreach(println)
>
>
> here, hivetable is a DF that I mentioned with 3 columns  "id , pv,
> location", which is already sorted by pv in desc. So I didn't call orderby
> in the 3rd line of my code. I just want the first 50 rows, based on
>  physical location, of each frame.
>
> To Tal:
>   I tried rank API, however this is not the API I want , because there
> are some values have same pv are ranked as same values. And first 50 rows
> of each frame is what I'm expecting. the attached file shows what I got by
> using rank.
>   Thank you anyway, I learnt what rank could provide from your advice.
>
> 
>
> ThanksBest regards!
> San.Luo
>
> - 原始邮件 -
> 发件人:Anton Okolnychyi <anton.okolnyc...@gmail.com>
> 收件人:user <user@spark.apache.org>
> 抄送人:luohui20...@sina.com
> 主题:Re: how to select first 50 value of each group after group by?
> 日期:2016年07月06日 23点22分
>
> The following resources should be useful:
>
>
> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
>
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html
>
> The last link should have the exact solution
>
> 2016-07-06 16:55 GMT+02:00 Tal Grynbaum <tal.grynb...@gmail.com>:
>
> You can use rank window function to rank each row in the group,  and then
> filter the rowz with rank < 50
>
> On Wed, Jul 6, 2016, 14:07 <luohui20...@sina.com> wrote:
>
> hi there
> I have a DF with 3 columns: id , pv, location.(the rows are already
> grouped by location and sort by pv in des)  I wanna get the first 50 id
> values grouped by location. I checked the API of
> dataframe,groupeddata,pairRDD, and found no match.
>   is there a way to do this naturally?
>   any info will be appreciated.
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: how to select first 50 value of each group after group by?

2016-07-06 Thread Anton Okolnychyi
The following resources should be useful:

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html

The last link should have the exact solution

2016-07-06 16:55 GMT+02:00 Tal Grynbaum :

> You can use rank window function to rank each row in the group,  and then
> filter the rowz with rank < 50
>
> On Wed, Jul 6, 2016, 14:07  wrote:
>
>> hi there
>> I have a DF with 3 columns: id , pv, location.(the rows are already
>> grouped by location and sort by pv in des)  I wanna get the first 50 id
>> values grouped by location. I checked the API of
>> dataframe,groupeddata,pairRDD, and found no match.
>>   is there a way to do this naturally?
>>   any info will be appreciated.
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>


Last() Window Function

2016-06-27 Thread Anton Okolnychyi
Hi all!

I am learning Spark SQL and window functions.
The behavior of the last() window function was unexpected for me in one
case(for a person without any previous experience in the window functions).

I define my window specification as follows:
Window.partitionBy('transportType, 'route).orderBy('eventTime).

So, I have neither rowsBetween nor rangeBetween boundaries.
In this scenario, I expect to get the latest event (by time) in a group if
I apply the last('eventTime) window function over this window
specification.
However, this does not happen.

Looking at the code, I was able to figure out that if there are no
range/rows boundaries, the UnspecifiedFrame is assigned. Later, in
ResolveWindowFrame for the last() function, Spark assigns a default window
frame. The default frame depends on the presence of any order specification
(if one has an order specification, the default frame is RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW). That's why the last() window function
does work I as expected in my case. There is a very helpful comment in
SpecifiedWindowFrame. I wish I could find it in the documentation.

That's why I have 2 questions:
- Did I miss the place in the documentation where this behavior is
described? If no, would it be appropriate from my side to try to find where
this can be done?
- Would it be appropriate/useful to add some window function examples to
spark/examples? There are no such so far

Sincerely,
Anton Okolnychyi