Hi, We have a use case where we need to *sessionize* our data and for each *session* emit some *metrics* we need to handle *repeated sessions* and *sessions timeout*. We have come up with the following code structure and would like to understand if we understand all the concept of *watermark*, *flatMapGroupWithState*
Can some one help me understand the following: 1. Will my memory consumption keep increasing ? 2. Is my understanding correct that the *aggMetrics* data frame is a bounded data frame and will always contain the last 30 minutes worth data? 3. When I do the aggregation in Step 2, will Spark only use the last 30 minute of data for aggregation ? Here is the spark streaming code: Step 1: // 1. I am getting the data from Kafka around 50k events per second // 2. I am using a 30 minutes watermark to filter out events that are arriving // late. // 3. I am using EventTimeTimeout // 4. My `updateSessionState` func returns Itertor[Metric] (at minute granularity) val metrics: Dataset[Metric] = kafkaEvents .withWatermark("timestamp", "30 minutes") .groupByKey(e => e._2.sessionId) .flatMapGroupsWithState( OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())(updateSessionState(broadcastWrapper)) Step 2: // 1. I am aggregating the data by the metric name and the minute // 2. I am using the watermark here again of 30 mins assuming the // results in the *metrics* which are 30 mins older will be removed from the memory // Is my assumption correct??? // 3. Is *aggMetrics* a bounded data frame which will only hold last 30 minutes of data ?? val aggMetrics: Dataset[(String, Double)] = metrics .map(metric => (long2timestamp(metric.timestamp), metric)) .toDF("timestamp", "metric") .as[(Timestamp, Metric)] .withWatermark("timestamp", "30 minutes") .map { case (_, m) => (s"${m.name}.${m.timestamp}",m.count) } .groupByKey(_._1) .agg(typed.sum(_._2)) import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ Step 3: // 1. All those metric that got updated I am emitting to KairosDB aggMetrics .writeStream .format("com.walmart.cxtools.expo.kairos.KairosSinkProvider") .option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Update()) .trigger(Trigger.ProcessingTime(60.seconds)) .start() .awaitTermination() -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org