Hi,

We have a use case where we need to *sessionize* our data and for each
*session* emit some *metrics* we need to handle *repeated sessions* and
*sessions timeout*. We have come up with the following code structure and
would like to understand if we understand all the concept of *watermark*,
*flatMapGroupWithState* 

Can some one help me understand the following:

1. Will my memory consumption keep increasing ? 
2. Is my understanding correct that the *aggMetrics* data frame is a bounded
data frame and will always contain the last 30 minutes worth data?
3. When I do the aggregation in Step 2, will Spark only use the last 30
minute of data for aggregation ?

Here is the spark streaming code:
Step 1:
// 1. I am getting the data from Kafka around 50k events per second
// 2. I am using a 30 minutes watermark to filter out events that are
arriving 
//    late.
// 3. I am using EventTimeTimeout
// 4. My `updateSessionState` func returns Itertor[Metric] (at minute
granularity)
val metrics: Dataset[Metric] = kafkaEvents
  .withWatermark("timestamp", "30 minutes")
  .groupByKey(e => e._2.sessionId)
  .flatMapGroupsWithState(
    OutputMode.Update(),
GroupStateTimeout.EventTimeTimeout())(updateSessionState(broadcastWrapper))

Step 2:
// 1. I am aggregating the data by the metric name and the minute
// 2. I am using the watermark here again of 30 mins assuming the 
//    results in the *metrics* which are 30 mins older will be removed from
the memory
//    Is my assumption correct???
// 3. Is *aggMetrics* a bounded data frame which will only hold last 30
minutes of data ??
val aggMetrics: Dataset[(String, Double)] = metrics
  .map(metric => (long2timestamp(metric.timestamp), metric))
  .toDF("timestamp", "metric")
  .as[(Timestamp, Metric)]
  .withWatermark("timestamp", "30 minutes")
  .map {
    case (_, m) => (s"${m.name}.${m.timestamp}",m.count)
  }
  .groupByKey(_._1)
  .agg(typed.sum(_._2))

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

Step 3:
// 1. All those metric that got updated I am emitting to KairosDB
aggMetrics
  .writeStream
  .format("com.walmart.cxtools.expo.kairos.KairosSinkProvider")
  .option("checkpointLocation", checkpointLocation)
  .outputMode(OutputMode.Update())
  .trigger(Trigger.ProcessingTime(60.seconds))
  .start()
  .awaitTermination()




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to