Re: custom rdd - do I need a hadoop input format?

2019-09-17 Thread Arun Mahadevan
You can do it with custom RDD implementation.
You will mainly implement "getPartitions" - the logic to split your input
into partitions and "compute" to compute and return the values from the
executors.

On Tue, 17 Sep 2019 at 08:47, Marcelo Valle  wrote:

> Just to be more clear about my requirements, what I have is actually a
> custom format, with header, summary and multi line blocks. I want to create
> tasks per block and no per line.I already have a library that reads an
> InputStream and outputs an Iterator of Block, but now I need to integrate
> this with spark
>
> On Tue, 17 Sep 2019 at 16:28, Marcelo Valle 
> wrote:
>
>> Hi,
>>
>> I want to create a custom RDD which will read n lines in sequence from a
>> file, which I call a block, and each block should be converted to a spark
>> dataframe to be processed in parallel.
>>
>> Question - do I have to implement a custom hadoop input format to achieve
>> this? Or is it possible to do it only with RDD APIs?
>>
>> Thanks,
>> Marcelo.
>>
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>


Re: how to get spark-sql lineage

2019-05-16 Thread Arun Mahadevan
You can check out
https://github.com/hortonworks-spark/spark-atlas-connector/

On Wed, 15 May 2019 at 19:44, lk_spark  wrote:

> hi,all:
> When I use spark , if I run some SQL to do ETL how can I get
> lineage info. I found that , CDH spark have some config about lineage :
> spark.lineage.enabled=true
> spark.lineage.log.dir=/var/log/spark2/lineage
> Are they also work for apache spark ?
>
> 2019-05-16
> --
> lk_spark
>


Re: JvmPauseMonitor

2019-04-15 Thread Arun Mahadevan
Spark TaskMetrics[1] has a "jvmGCTime" metric that captures the amount of
time spent in GC. This is also available via the listener I guess.

Thanks,
Arun

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L89


On Mon, 15 Apr 2019 at 09:52, Eugene Koifman 
wrote:

> Hi,
>
> A number of projects in Hadoop echo system use
> org.apache.hadoop.util.JvmPauseMonitor (or clones of it) to log long GC
> pauses.
>
> Is there something like that for a Spark Executor, that can make a log
> entry based on GC time exceeding a configured limit?
>
>
>
> Thank you,
>
> Eugene
>
>


Re: Structured Streaming & Query Planning

2019-03-18 Thread Arun Mahadevan
I don't think its feasible with the current logic. Typically the query
planning time should be a tiny fraction unless you are processing tiny
micro-batches more frequently. You might want to consider adjusting the
trigger interval to processes more data per micro-batch and see if it
helps. The tiny micro-batch use cases should ideally be solved using
continuous mode (once it matures) which would not have this overhead.

Thanks,
Arun

On Mon, 18 Mar 2019 at 00:39, Jungtaek Lim  wrote:

> Almost everything is coupled with logical plan right now, including
> updated range for source in new batch, updated watermark for stateful
> operations, random seed in each batch. Please refer below codes:
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
>
> We might try out replacing these things in physical plan so that logical
> plan doesn't need to be evaluated, but not sure it's feasible.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2019년 3월 18일 (월) 오후 4:03, Paolo Platter 님이 작성:
>
>> I can understand that if you involve columns with variable distribution
>> in join operations, it may change your execution plan, but most of the time
>> this is not going to happen, in streaming the most used operations are: map
>> filter, grouping and stateful operations and in all these cases I can't how
>> a dynamic query planning could help.
>>
>> It could be useful to have a parameter to force a streaming query to
>> calculate the query plan just once.
>>
>> Paolo
>>
>>
>>
>> Ottieni Outlook per Android 
>>
>> --
>> *From:* Alessandro Solimando 
>> *Sent:* Thursday, March 14, 2019 6:59:50 PM
>> *To:* Paolo Platter
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Structured Streaming & Query Planning
>>
>> Hello Paolo,
>> generally speaking, query planning is mostly based on statistics and
>> distributions of data values for the involved columns, which might
>> significantly change over time in a streaming context, so for me it makes a
>> lot of sense that it is run at every schedule, even though I understand
>> your concern.
>>
>> For the second question I don't know how to (or if you even can) cache
>> the computed query plan.
>>
>> If possible, would you mind sharing your findings afterwards? (query
>> planning on streaming it's a very interesting and not yet enough explored
>> topic IMO)
>>
>> Best regards,
>> Alessandro
>>
>> On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
>> wrote:
>>
>>> Hi All,
>>>
>>>
>>>
>>> I would like to understand why in a streaming query ( that should not be
>>> able to change its behaviour along iterations ) there is a
>>> queryPlanning-Duration effort ( in my case is 33% of trigger interval ) at
>>> every schedule. I don’t uderstand  why this is needed and if it is possible
>>> to disable or cache it.
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> [image: cid:image001.jpg@01D41D15.E01B6F00]
>>>
>>> *Paolo Platter*
>>>
>>> *CTO*
>>>
>>> E-mail:paolo.plat...@agilelab.it
>>>
>>> Web Site:   www.agilelab.it
>>>
>>>
>>>
>>>
>>>
>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Arun Mahadevan
Read the link carefully,

This solution is available (*only*) in Databricks Runtime.

You can enable RockDB-based state management by setting the following
configuration in the SparkSession before starting the streaming query.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")


On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:

> Hi,
>
> I have a very simple SSS pipeline which does:
>
> val query = df
>   .dropDuplicates(Array("Id", "receivedAt"))
>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>   .writeStream
>   .format("parquet")
>   .partitionBy("availabilityDomain", timePartitionCol)
>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>   .option("path", "/data")
>   .option("checkpointLocation", "/data_checkpoint")
>   .start()
>
> After ingesting 2T records, the state under checkpoint folder on HDFS 
> (replicator factor 2) grows to 2T bytes.
> My cluster has only 2T bytes which means the cluster can barely handle 
> further data growth.
>
> Online spark documents 
> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot find 
> any document how
>
> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle 
> HDFS.
>
> Any suggestions? Thanks!
>
>
>
>


Re: Question about RDD pipe

2019-01-17 Thread Arun Mahadevan
Yes, the script should be present on all the executor nodes.

You can pass your script via spark-submit (e.g. --files script.sh) and then
you should be able to refer that (e.g. "./script.sh") in rdd.pipe.

- Arun

On Thu, 17 Jan 2019 at 14:18, Mkal  wrote:

> Hi, im trying to run an external script on spark using rdd.pipe() and
> although it runs successfully on standalone, it throws an error on cluster.
> The error comes from the executors and it's : "Cannot run program
> "path/to/program": error=2, No such file or directory".
>
> Does the external script need to be available on all nodes in the cluster
> when using rdd.pipe()?
>
> What if i don't have permission to install anything on the nodes of the
> cluster? Is there any other way to make the script available to the worker
> nodes?
>
> (The external script is loaded in HDFS and is passed to the driver class
> through args)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Error - Dropping SparkListenerEvent because no remaining room in event queue

2018-10-24 Thread Arun Mahadevan
Maybe you have spark listeners that are not processing the events fast
enough?
Do you have spark event logging enabled?
You might have to profile the built in and your custom listeners to see
whats going on.

- Arun

On Wed, 24 Oct 2018 at 16:08, karan alang  wrote:

>
> Pls note - Spark version is 2.2.0
>
> On Wed, Oct 24, 2018 at 3:57 PM karan alang  wrote:
>
>> Hello -
>> we are running a Spark job, and getting the following error -
>>
>> "LiveListenerBus: Dropping SparkListenerEvent because no remaining room
>> in event queue"
>>
>> As per the recommendation in the Spark Docs -
>>
>> I've increased the value of property
>> spark.scheduler.listenerbus.eventqueue.capacity to 9 (from the
>> default 1)
>> and also increased the Diver memory
>>
>> That seems to have mitigated the issue.
>>
>> The question is - is there is any Code optimization (or any other) that
>> can be done to resolve this problem ?
>> Pls note - we are primarily using functions like - reduce(),
>> collectAsList() and persist() as part of the job.
>>
>


Re: Kafka backlog - spark structured streaming

2018-07-30 Thread Arun Mahadevan
Heres a proposal to a add - https://github.com/apache/spark/pull/21819

Its always good to set "maxOffsetsPerTrigger" unless you want spark to
process till the end of the stream in each micro batch. Even without
"maxOffsetsPerTrigger" the lag can be non-zero by the time the micro batch
completes.

On 30 July 2018 at 08:50, Burak Yavuz  wrote:

> If you don't set rate limiting through `maxOffsetsPerTrigger`, Structured
> Streaming will always process until the end of the stream. So number of
> records waiting to be processed should be 0 at the start of each trigger.
>
> On Mon, Jul 30, 2018 at 8:03 AM, Kailash Kalahasti <
> kailash.kalaha...@gmail.com> wrote:
>
>> Is there any way to find out backlog on kafka topic while using spark
>> structured streaming ? I checked few consumer apis but that requires to
>> enable groupid for streaming, but seems it is not allowed.
>>
>> Basically i want to know number of records waiting to be processed.
>>
>> Any suggestions ?
>>
>
>


Re: Question of spark streaming

2018-07-27 Thread Arun Mahadevan
“activityQuery.awaitTermination()” is a blocking call.

 

 You can just skip this line and run other commands in the same shell to query 
the stream.

 

Running the query from a different shell won’t help since the memory sink where 
the results are store is not shared between the two shells.

 

Thanks,

Arun

 

From: utkarsh rathor 
Date: Friday, July 27, 2018 at 5:15 AM
To: "user@spark.apache.org" 
Subject: Question of spark streaming

 

 

I am following the book Spark the Definitive Guide The following code is 
executed locally using spark-shell

Procedure: Started the spark-shell without any other options
val static = 
spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
 
val dataSchema = static.schema
 
val streaming = spark.readStream.schema(dataSchema) 
.option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
 
val activityCounts = streaming.groupBy("gt").count()
 
val activityQuery  = 
activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
 
activityQuery.awaitTermination()
The Books says that "After this code is executed the streaming computation will 
have started in the background"  "Now that this stream is running , we can 
experiment with the result by querying"

MY OBSERVATION:

When this code is executed it does not frees the shell for me to type in the 
commands such asspark.streams.active

Hence I cannot query this stream

My resarch

I tried to open a new spark-shell but querying in that shell does not returns 
any results. Are the streams obtained from this shell accessible from other 
another instance of the shell.

I want the table in memory so that I can use the to query using command
for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}



Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread Arun Mahadevan
What I meant was the number of partitions cannot be varied with ForeachWriter 
v/s if you were to write to each sink using independent queries. Maybe this is 
obvious.

I am not sure about the difference you highlight about the performance part. 
The commit happens once per micro batch and "close(null)" is invoked. You can 
batch your writes in the process and/or in the close. The guess the writes can 
still be atomic and decided by if “close” returns successfully or throws an 
exception.

Thanks,
Arun

From:  chandan prakash 
Date:  Thursday, July 12, 2018 at 10:37 AM
To:  Arun Iyer 
Cc:  Tathagata Das , "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject:  Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response. 
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the 
sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because 
write will happen to a sink per record basis (after deciding a record belongs 
to which particular sink), where as in the current implementation all data 
under a RDD partition gets committed to the sink atomically in one go. Please 
correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan  wrote:
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From: chandan prakash 
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das , "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi, 
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das  
wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column. 

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan  wrote:
I had a similar issue and i think that’s where the structured streaming design 
lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store 
suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream 
based on schema. 
For example, a Kafka topic can have three different types of schema messages 
and I would like to ingest into the three different column tables(having 
different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading 
the same topic and ingesting to respective column tables using their Sink 
implementations. 
These three streaming queries create underlying three IncrementalExecutions and 
three KafkaSources, and three queries reading the same data from the same Kafka 
topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way 
to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the 
messages in a Kafka partition, unfortunately this is not in our control and 
customers cannot change it due to their dependencies on other subsystems.

Thanks,
http://www.snappydata.io/blog

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava  

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread Arun Mahadevan
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From:  chandan prakash 
Date:  Thursday, July 12, 2018 at 2:38 AM
To:  Tathagata Das , "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject:  Re: [Structured Streaming] Avoiding multiple streaming queries

Hi, 
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das  
wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column. 

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan  wrote:
I had a similar issue and i think that’s where the structured streaming design 
lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store 
suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream 
based on schema. 
For example, a Kafka topic can have three different types of schema messages 
and I would like to ingest into the three different column tables(having 
different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading 
the same topic and ingesting to respective column tables using their Sink 
implementations. 
These three streaming queries create underlying three IncrementalExecutions and 
three KafkaSources, and three queries reading the same data from the same Kafka 
topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way 
to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the 
messages in a Kafka partition, unfortunately this is not in our control and 
customers cannot change it due to their dependencies on other subsystems.

Thanks,
http://www.snappydata.io/blog

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava  
wrote:
I have a structured streaming query which sinks to Kafka.  This query has a 
complex aggregation logic.



I would like to sink the output DF of this query to multiple Kafka topics each 
partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka 
sinks for each of the different Kafka topics because that would mean running 
multiple streaming queries - one for each Kafka topic, especially since my 
aggregation logic is complex.



Questions:

1.  Is there a way to output the results of a structured streaming query to 
multiple Kafka topics each with a different key column but without having to 
execute multiple streaming queries? 



2.  If not,  would it be efficient to cascade the multiple queries such that 
the first query does the complex aggregation and writes output to Kafka and 
then the other queries just read the output of the first query and write their 
topics to Kafka thus avoiding doing the complex aggregation again?



Thanks in advance for any help.



Priyank







-- 
Chandan Prakash




Re: question on collect_list or say aggregations in general in structured streaming 2.3.0

2018-05-03 Thread Arun Mahadevan
I think you need to group by a window (tumbling) and define watermarks (put a 
very low watermark or even 0) to discard the state. Here the window duration 
becomes your logical batch.

- Arun

From:  kant kodali 
Date:  Thursday, May 3, 2018 at 1:52 AM
To:  "user @spark" 
Subject:  Re: question on collect_list or say aggregations in general in 
structured streaming 2.3.0

After doing some more research using Google. It's clear that aggregations by 
default are stateful in Structured Streaming. so the question now is how to do 
stateless aggregations(not storing the result from previous batches) using 
Structured Streaming 2.3.0? I am trying to do it using raw spark SQL so not 
using FlatMapsGroupWithState. And if that is not available then is it fair to 
say there is no declarative way to do stateless aggregations?

On Thu, May 3, 2018 at 1:24 AM, kant kodali  wrote:
Hi All, 

I was under an assumption that one needs to run grouby(window(...)) to run any 
stateful operations but looks like that is not the case since any aggregation 
like query

"select count(*) from some_view"  is also stateful since it stores the result 
of the count from the previous batch. Likewise, if I do 

"select collect_list(*) from some_view" with say maxOffsetsTrigger set to 1 I 
can see the rows from the previous batch at every trigger. 

so is it fair to say aggregations by default are stateful?

I am looking more like DStream like an approach(stateless) where I want to 
collect bunch of records on each batch do some aggregation like say count and 
throw the result out and next batch it should only count from that batch only 
but not from the previous batch.

so If I run "select collect_list(*) from some_view" I want to collect whatever 
rows are available at each batch/trigger but not from the previous batch. How 
do I do that?

Thanks!




Re: [Structured Streaming] Restarting streaming query on exception/termination

2018-04-24 Thread Arun Mahadevan
I guess you can wait for the termination, catch exception and then restart the 
query in a loop. Something like…

while (true) {
  try {
val query = df.writeStream().
…
   .start()
query.awaitTermination()
  } catch {
case e: StreamingQueryException => // log it
  }
}

Thanks,
Arun

From:  Priyank Shrivastava 
Date:  Monday, April 23, 2018 at 11:27 AM
To:  formice <51296...@qq.com>, "user@spark.apache.org" 
Subject:  Re: [Structured Streaming] Restarting streaming query on 
exception/termination

Thanks for the reply formice.  I think that --supervise param helps to restart 
the whole spark application - what I want to be able to do is to only restart 
the structured streaming query which terminated due to error. Also, I am 
running my app in client mode. 

Thanks,
Priyank

On Sun, Apr 22, 2018 at 8:52 PM, formice <51296...@qq.com> wrote:
standlone 
  add  config:(1)--deploy-mode cluster (2)--supervise
  example:  spark-submit  --master spark://master:7077 --deploy-mode 
cluster --supervise ..


-- 原始邮件 --
发件人: "Priyank Shrivastava";
发送时间: 2018年4月21日(星期六) 凌晨5:45
收件人: "user";
主题: [Structured Streaming] Restarting streaming query on exception/termination

What's the right way of programmatically restarting a structured streaming 
query which has terminated due to an exception? Example code or reference would 
be appreciated.

Could it be done from within the onQueryTerminated() event handler of 
StreamingQueryListener class?

Priyank





Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Arun Mahadevan
I assume its going to compare by the first column and if equal compare the 
second column and so on.

From:  kant kodali <kanth...@gmail.com>
Date:  Wednesday, April 18, 2018 at 6:26 PM
To:  Jungtaek Lim <kabh...@gmail.com>
Cc:  Arun Iyer <ar...@apache.org>, Michael Armbrust <mich...@databricks.com>, 
Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" 
<user@spark.apache.org>
Subject:  Re: can we use mapGroupsWithState in raw sql?

This is cool! Looks to me this works too

select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1 group 
by id)

but I got naive question again. what does max of a struct mean? Does it always 
take the max of the first column and ignore the rest of the fields in the 
struct?

On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim <kabh...@gmail.com> wrote:
Thanks Arun, I modified a bit to try my best to avoid enumerating fields: 

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.*")
  .groupBy($"ID")
  .agg(max(struct($"AMOUNT", $"*")).as("data"))
  .select($"data.*")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

It still have a minor issue: the column "AMOUNT" is showing twice in result 
table, but everything works like a charm.

-Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성:
The below expr might work:

df.groupBy($"id").agg(max(struct($"amount", 
$"my_timestamp")).as("data")).select($"id", $"data.*")

Thanks,
Arun

From: Jungtaek Lim <kabh...@gmail.com>
Date: Wednesday, April 18, 2018 at 4:54 PM
To: Michael Armbrust <mich...@databricks.com>
Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>, Tathagata 
Das <tathagata.das1...@gmail.com>, "user @spark" <user@spark.apache.org>

Subject: Re: can we use mapGroupsWithState in raw sql?

Thanks Michael for providing great solution. Great to remove UDAF and any needs 
to provide fields manually. 

Btw, your code has compilation error. ')' is missing, and after I fix it, it 
complains again with other issue.

:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column 
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName, 
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell or 
whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성:
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp", 
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:
Hi Arun, 

I want to select the entire row with the max timestamp for each group. I have 
modified my data set below to avoid any confusion.

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
1  |  6 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> wrote:
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From: kant kodali <kanth...@gmail.com>
Date: Tuesday, April 17, 2018 at 11:41 AM
To: Tathagata Das <tathagata.das1...@gmail.com>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
wi

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Arun Mahadevan
The below expr might work:

df.groupBy($"id").agg(max(struct($"amount", 
$"my_timestamp")).as("data")).select($"id", $"data.*")

Thanks,
Arun

From:  Jungtaek Lim <kabh...@gmail.com>
Date:  Wednesday, April 18, 2018 at 4:54 PM
To:  Michael Armbrust <mich...@databricks.com>
Cc:  kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>, Tathagata 
Das <tathagata.das1...@gmail.com>, "user @spark" <user@spark.apache.org>
Subject:  Re: can we use mapGroupsWithState in raw sql?

Thanks Michael for providing great solution. Great to remove UDAF and any needs 
to provide fields manually. 

Btw, your code has compilation error. ')' is missing, and after I fix it, it 
complains again with other issue.

:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column 
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName, 
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell or 
whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성:
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp", 
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:
Hi Arun, 

I want to select the entire row with the max timestamp for each group. I have 
modified my data set below to avoid any confusion.

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
1  |  6 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> wrote:
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From: kant kodali <kanth...@gmail.com>
Date: Tuesday, April 17, 2018 at 11:41 AM
To: Tathagata Das <tathagata.das1...@gmail.com>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
without using order by since it requires complete mode or mapGroupWithState?

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 20 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <tathagata.das1...@gmail.com> 
wrote:
Unfortunately no. Honestly it does not make sense as for type-aware operations 
like map, mapGroups, etc., you have to provide an actual JVM function. That 
does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> wrote:
Hi All, 

can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

Thanks!









Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Arun Mahadevan
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From:  kant kodali 
Date:  Tuesday, April 17, 2018 at 11:41 AM
To:  Tathagata Das 
Cc:  "user @spark" 
Subject:  Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
without using order by since it requires complete mode or mapGroupWithState?

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 20 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das  
wrote:
Unfortunately no. Honestly it does not make sense as for type-aware operations 
like map, mapGroups, etc., you have to provide an actual JVM function. That 
does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
Hi All, 

can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

Thanks!