Re: Re: About the question of Spark Structured Streaming window output

2018-08-26 Thread z...@zjdex.com
Hi Jungtaek Lim & Gerard Mass:
Thanks very much.
When I put three batch data like following :

batch 0:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1

batch 1:
2018-08-27 11:04:00,1
2018-08-27 11:04:01,1

batch 2:
2018-08-27 11:17:00,1
2018-08-27 11:17:01,1

the agg result of time "2018-08-27 09:53:00" is output like following:
Batch: 2
---
+---+---++
|  start|end|sumvalue|
+---+---++
|2018-08-27 09:50:00|2018-08-27 09:55:00|   2|
+---+---++

For the result, I wonder to know:
1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"? When I 
define the window, the starttime is not set.
2、why the agg result of time "2018-08-27 09:53:00 " is not output when the 
batch1 data is comming?

Thanks a lot!





z...@zjdex.com
 
From: Jungtaek Lim
Date: 2018-08-27 11:01
To: z...@zjdex.com
CC: Gerard Maas; user; 王程浩
Subject: Re: Re: About the question of Spark Structured Streaming window output
You may want to add streaming listener to your query and see when/how watermark 
is updated. In short, watermark is calculated from previous batch and 
calculated value is applied to current batch. So you may think that the result 
is provided later than expected, maybe a batch.

2018년 8월 27일 (월) 오전 11:56, z...@zjdex.com 님이 작성:
Hi Gerard Mass:
Thanks a lot for your reply. 
When i use "append" model,  I send the following data:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1
The result (which has only schema, like the following) has received after the 
batch is end. But when the time of window + watermark is up, there is no result 
to output. Is there something I misss? Thanks in advance.





z...@zjdex.com
 
From: Gerard Maas
Date: 2018-08-27 05:00
To: zrc
CC: spark users; wch
Subject: Re: About the question of Spark Structured Streaming window output
Hi,

When you use a window in Append mode, you need to wait for the end of the 
window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell 
and the data present in it.

val windowedCounts = dataSrc
  .withWatermark("timestamp", "1 minutes")
  .groupBy(window($"timestamp", "5 minutes"))
  .agg(sum("value") as "sumvalue")
  .select("window.start", "window.end","sumvalue")




Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result 
to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record 
output

2、when I set the update output model, I send inputdata, the result is output 
every batch .But I want output the result only once when window is end. How can 
I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM z...@zjdex.com  wrote:
Hi :
   I have some questions about spark structured streaming window output  in 
spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .master("local[1]")
  .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to 
localhost:
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
  .withWatermark("time", "1 minutes")
  .groupBy(window($"time", "5 minutes"))
  .agg(sum("value") as "sumvalue")
  .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result 
to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output 
every batch .But I want output the result only once when window is end. How can 
I do?

Thanks in advance!




z...@zjdex.com


java.io.NotSerializableException: org.apache.spark.sql.TypedColumn

2018-08-26 Thread zzcclp
Hi dev:
  I am using Spark-Shell to run the example which is in section
'http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions',
and there is an error: 

*Caused by: java.io.NotSerializableException:
org.apache.spark.sql.TypedColumn
Serialization stack:
- object not serializable (class: org.apache.spark.sql.TypedColumn, 
value:
myaverage() AS `average_salary`)
- field (class: $iw, name: averageSalary, type: class
org.apache.spark.sql.TypedColumn)
- object (class $iw, $iw@4b2f8ae9)
- field (class: MyAverage$, name: $outer, type: class $iw)
- object (class MyAverage$, MyAverage$@2be41d90)
- field (class:
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression,
name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator)
- object (class
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression,
MyAverage(Employee))
- field (class:
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
name: aggregateFunction, type: class
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
- object (class
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)),
Some(class Employee), Some(StructType(StructField(name,StringType,true),
StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
Average, true])).count AS count#26L, newInstance(class Average), input[0,
double, false] AS value#24, DoubleType, false, 0, 0))
- writeObject data (class:
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@5e92c46f)
- writeReplace data (class:
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon,
List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class
Employee)), Some(class Employee),
Some(StructType(StructField(name,StringType,true),
StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
Average, true])).count AS count#26L, newInstance(class Average), input[0,
double, false] AS value#24, DoubleType, false, 0, 0)))
- field (class:
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name:
aggregateExpressions, type: interface scala.collection.Seq)
- object (class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec,
ObjectHashAggregate(keys=[],
functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class
Employee)), Some(class Employee),
Some(StructType(StructField(name,StringType,true),
StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
Average, true])).count AS count#26L, newInstance(class Average), input[0,
double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37])
+- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location:
InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct
)
- field (class:
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
name: $outer, type: class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec)
- object (class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
)
- field (class:
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
name: $outer, type: class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1)
- object (class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
)
- field (class: 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
name: f$23, type: interface scala.Function1)
- object (class 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
)
- field (class:
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25,
name: $outer, type: class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
- object (class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25,
)
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
interface scala.Function3)
- object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[9]
at show at :62)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: 
class
org.apache.spark.rdd.RDD)
- object (class 

Re: How to deal with context dependent computing?

2018-08-26 Thread JF Chen
Thanks Sonal.
For example, I have data as following:
login 2018/8/27 10:00
logout 2018/8/27 10:05
login 2018/8/27 10:08
logout 2018/8/27 10:15
login 2018/8/27 11:08
logout 2018/8/27 11:32

Now I want to calculate the time between each login and logout. For
example, I should get 5 min, 7 min, 24 min from the above sample data.
I know I can calculate it with foreach, but it seems all data running on
spark driver node rather than multi executors.
So any good way to solve this problem? Thanks!

Regard,
Junfeng Chen


On Thu, Aug 23, 2018 at 6:15 PM Sonal Goyal  wrote:

> Hi Junfeng,
>
> Can you please show by means of an example what you are trying to achieve?
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
> On Thu, Aug 23, 2018 at 8:22 AM, JF Chen  wrote:
>
>> For example, I have some data with timstamp marked as category A and B,
>> and ordered by time. Now I want to calculate each duration from A to B. In
>> normal program, I can use the  flag bit to record the preview data if it is
>> A or B, and then calculate the duration. But in Spark Dataframe, how to do
>> it?
>>
>> Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>
>


Re: Spark Structured Streaming using S3 as data source

2018-08-26 Thread Burak Yavuz
Yes, the checkpoint makes sure that you start off from where you left off.

On Sun, Aug 26, 2018 at 2:22 AM sherif98 
wrote:

> I have data that is continuously pushed to multiple S3 buckets. I want to
> set
> up a structured streaming application that uses the S3 buckets as the data
> source and do stream-stream joins.
>
> My question is if the application is down for some reason, will restarting
> the application would continue processing data from the S3 where it left
> off?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Structured Streaming using S3 as data source

2018-08-26 Thread sherif98
I have data that is continuously pushed to multiple S3 buckets. I want to set
up a structured streaming application that uses the S3 buckets as the data
source and do stream-stream joins.

My question is if the application is down for some reason, will restarting
the application would continue processing data from the S3 where it left
off?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org