[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107309220
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
+
+  // Check compatibility with output modes and aggregations in 
query
+  val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
+
+  if (m.isMapGroupsWithState) {   // check 
mapGroupsWithState
+// allowed only in update query output mode and without 
aggregation
+if (aggsAfterFlatMapGroups.nonEmpty) {
+  throwError(
+"mapGroupsWithState is not supported with aggregation " +
+  "on a streaming DataFrame/Dataset")
+} else if (outputMode != InternalOutputModes.Update) {
+  throwError(
+"mapGroupsWithState is not supported with " +
   s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+}
+  } else {   // check 
latMapGroupsWithState
+if (aggsAfterFlatMapGroups.isEmpty) {
+  // flatMapGroupsWithState without aggregation: operation's 
output mode must
+  // match query output mode
+  m.outputMode match {
+case InternalOutputModes.Update if outputMode != 
InternalOutputModes.Update =>
+  throwError(
+"flatMapGroupsWithState in update mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case InternalOutputModes.Append if outputMode != 
InternalOutputModes.Append =>
+  throwError(
+"flatMapGroupsWithState in append mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case _ =>
   }
-case InternalOutputModes.Append =>
-  if (outputMode != InternalOutputModes.Append) {
-throwError("flatMapGroupsWithState in append mode is not 
supported with " +
-  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+} else {
+  // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
+  // *groupsWithState after aggregation not allowed
+  if (m.outputMode == InternalOutputModes.Update) {
+throwError(
+  "flatMapGroupsWithState in update mode is not supported 
with " +
+"aggregation on a streaming DataFrame/Dataset")
+  } else if (collectStreamingAggregates(m).nonEmpty) {
+throwError(
+  "flatMapGroupsWithState in append mode is not supported 
after " +
+s"aggregation on a streaming DataFrame/Dataset")
   }
+}
   }
 
-// flatMapGroupsWithState(Update) with aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && m.outputMode == InternalOutputModes.Update
-&& colle

[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307806
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
 ---
@@ -34,9 +32,20 @@
 @InterfaceStability.Evolving
 public class KeyedStateTimeout {
 
-  /** Timeout based on processing time.  */
+  /**
+   * Timeout based on processing time. The duration of timeout can be set 
for each group in
+   * `map/flatMapGroupsWithState` by calling 
`KeyedState.setTimeoutDuration()`.
+   */
   public static KeyedStateTimeout ProcessingTimeTimeout() { return 
ProcessingTimeTimeout$.MODULE$; }
--- End diff --

I'd probably still remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307722
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
+
+  // Check compatibility with output modes and aggregations in 
query
+  val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
+
+  if (m.isMapGroupsWithState) {   // check 
mapGroupsWithState
+// allowed only in update query output mode and without 
aggregation
+if (aggsAfterFlatMapGroups.nonEmpty) {
+  throwError(
+"mapGroupsWithState is not supported with aggregation " +
+  "on a streaming DataFrame/Dataset")
+} else if (outputMode != InternalOutputModes.Update) {
+  throwError(
+"mapGroupsWithState is not supported with " +
   s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+}
+  } else {   // check 
latMapGroupsWithState
+if (aggsAfterFlatMapGroups.isEmpty) {
+  // flatMapGroupsWithState without aggregation: operation's 
output mode must
+  // match query output mode
+  m.outputMode match {
+case InternalOutputModes.Update if outputMode != 
InternalOutputModes.Update =>
+  throwError(
+"flatMapGroupsWithState in update mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case InternalOutputModes.Append if outputMode != 
InternalOutputModes.Append =>
+  throwError(
+"flatMapGroupsWithState in append mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case _ =>
   }
-case InternalOutputModes.Append =>
-  if (outputMode != InternalOutputModes.Append) {
-throwError("flatMapGroupsWithState in append mode is not 
supported with " +
-  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+} else {
+  // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
+  // *groupsWithState after aggregation not allowed
+  if (m.outputMode == InternalOutputModes.Update) {
+throwError(
+  "flatMapGroupsWithState in update mode is not supported 
with " +
+"aggregation on a streaming DataFrame/Dataset")
+  } else if (collectStreamingAggregates(m).nonEmpty) {
+throwError(
+  "flatMapGroupsWithState in append mode is not supported 
after " +
+s"aggregation on a streaming DataFrame/Dataset")
   }
+}
   }
 
-// flatMapGroupsWithState(Update) with aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && m.outputMode == InternalOutputModes.Update
-&& c

[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307617
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -519,6 +588,52 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 )
   }
 
+  test("flatMapGroupsWithState - streaming with event time timeout") {
+// Function to maintain the max event time
+// Returns the max event time in the state, or -1 if the state was 
removed by timeout
+val stateFunc = (
+key: String,
+values: Iterator[(String, Long)],
+state: KeyedState[Long]) => {
+  val timeoutDelay = 5
+  if (key != "a") {
+Iterator.empty
+  } else {
+if (state.hasTimedOut) {
+  state.remove()
+  Iterator((key, -1))
+} else {
+  val valuesSeq = values.toSeq
+  val maxEventTime = math.max(valuesSeq.map(_._2).max, 
state.getOption.getOrElse(0L))
+  val timeoutTimestampMs = maxEventTime + timeoutDelay
+  state.update(maxEventTime)
+  state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
+  Iterator((key, maxEventTime.toInt))
+}
+  }
+}
+val inputData = MemoryStream[(String, Int)]
+val result =
+  inputData.toDS
+.select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
+.withWatermark("eventTime", "10 seconds")
+.as[(String, Long)]
+.groupByKey[String]((x: (String, Long)) => x._1)
+.flatMapGroupsWithState[Long, (String, Int)](Update, 
EventTimeTimeout)(stateFunc)
--- End diff --

As long as they aren't required its okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307589
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
 ---
@@ -34,9 +32,20 @@
 @InterfaceStability.Evolving
 public class KeyedStateTimeout {
 
-  /** Timeout based on processing time.  */
+  /**
+   * Timeout based on processing time. The duration of timeout can be set 
for each group in
+   * `map/flatMapGroupsWithState` by calling 
`KeyedState.setTimeoutDuration()`.
+   */
   public static KeyedStateTimeout ProcessingTimeTimeout() { return 
ProcessingTimeTimeout$.MODULE$; }
--- End diff --

Its just that if someone this does `import KeyedStateTimeout._` the code 
boils down to 
`flatMapGroupsWithState(Update, ProcessingTime) { ... } ` with no reference 
to timeout. 

Fine either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307445
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -519,6 +588,52 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 )
   }
 
+  test("flatMapGroupsWithState - streaming with event time timeout") {
+// Function to maintain the max event time
+// Returns the max event time in the state, or -1 if the state was 
removed by timeout
+val stateFunc = (
+key: String,
+values: Iterator[(String, Long)],
+state: KeyedState[Long]) => {
+  val timeoutDelay = 5
+  if (key != "a") {
+Iterator.empty
+  } else {
+if (state.hasTimedOut) {
+  state.remove()
+  Iterator((key, -1))
+} else {
+  val valuesSeq = values.toSeq
+  val maxEventTime = math.max(valuesSeq.map(_._2).max, 
state.getOption.getOrElse(0L))
+  val timeoutTimestampMs = maxEventTime + timeoutDelay
+  state.update(maxEventTime)
+  state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
+  Iterator((key, maxEventTime.toInt))
+}
+  }
+}
+val inputData = MemoryStream[(String, Int)]
+val result =
+  inputData.toDS
+.select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
+.withWatermark("eventTime", "10 seconds")
+.as[(String, Long)]
+.groupByKey[String]((x: (String, Long)) => x._1)
+.flatMapGroupsWithState[Long, (String, Int)](Update, 
EventTimeTimeout)(stateFunc)
--- End diff --

I was debugging and I left them there thinking it help readability of 
tests. I can remove them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
 ---
@@ -17,37 +17,45 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.sql.Date
+
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.sql.streaming.KeyedState
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, 
ProcessingTimeTimeout}
+import org.apache.spark.sql.execution.streaming.KeyedStateImpl._
+import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout}
 import org.apache.spark.unsafe.types.CalendarInterval
 
+
 /**
  * Internal implementation of the [[KeyedState]] interface. Methods are 
not thread-safe.
  * @param optionalValue Optional value of the state
  * @param batchProcessingTimeMs Processing time of current batch, used to 
calculate timestamp
  *  for processing time timeouts
- * @param isTimeoutEnabled Whether timeout is enabled. This will be used 
to check whether the user
- * is allowed to configure timeouts.
+ * @param timeoutConf   Type of timeout configured. Based on this, 
different operations will
+ *be supported.
--- End diff --

nit: indent is inconsistent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307253
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
+
+  // Check compatibility with output modes and aggregations in 
query
+  val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
+
+  if (m.isMapGroupsWithState) {   // check 
mapGroupsWithState
+// allowed only in update query output mode and without 
aggregation
+if (aggsAfterFlatMapGroups.nonEmpty) {
+  throwError(
+"mapGroupsWithState is not supported with aggregation " +
+  "on a streaming DataFrame/Dataset")
+} else if (outputMode != InternalOutputModes.Update) {
+  throwError(
+"mapGroupsWithState is not supported with " +
   s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+}
+  } else {   // check 
latMapGroupsWithState
+if (aggsAfterFlatMapGroups.isEmpty) {
+  // flatMapGroupsWithState without aggregation: operation's 
output mode must
+  // match query output mode
+  m.outputMode match {
+case InternalOutputModes.Update if outputMode != 
InternalOutputModes.Update =>
+  throwError(
+"flatMapGroupsWithState in update mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case InternalOutputModes.Append if outputMode != 
InternalOutputModes.Append =>
+  throwError(
+"flatMapGroupsWithState in append mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case _ =>
   }
-case InternalOutputModes.Append =>
-  if (outputMode != InternalOutputModes.Append) {
-throwError("flatMapGroupsWithState in append mode is not 
supported with " +
-  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+} else {
+  // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
+  // *groupsWithState after aggregation not allowed
+  if (m.outputMode == InternalOutputModes.Update) {
+throwError(
+  "flatMapGroupsWithState in update mode is not supported 
with " +
+"aggregation on a streaming DataFrame/Dataset")
+  } else if (collectStreamingAggregates(m).nonEmpty) {
+throwError(
+  "flatMapGroupsWithState in append mode is not supported 
after " +
+s"aggregation on a streaming DataFrame/Dataset")
   }
+}
   }
 
-// flatMapGroupsWithState(Update) with aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && m.outputMode == InternalOutputModes.Update
-&& colle

[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107307283
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
+
+  // Check compatibility with output modes and aggregations in 
query
+  val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
+
+  if (m.isMapGroupsWithState) {   // check 
mapGroupsWithState
+// allowed only in update query output mode and without 
aggregation
+if (aggsAfterFlatMapGroups.nonEmpty) {
+  throwError(
+"mapGroupsWithState is not supported with aggregation " +
+  "on a streaming DataFrame/Dataset")
+} else if (outputMode != InternalOutputModes.Update) {
+  throwError(
+"mapGroupsWithState is not supported with " +
   s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+}
+  } else {   // check 
latMapGroupsWithState
+if (aggsAfterFlatMapGroups.isEmpty) {
+  // flatMapGroupsWithState without aggregation: operation's 
output mode must
+  // match query output mode
+  m.outputMode match {
+case InternalOutputModes.Update if outputMode != 
InternalOutputModes.Update =>
+  throwError(
+"flatMapGroupsWithState in update mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case InternalOutputModes.Append if outputMode != 
InternalOutputModes.Append =>
+  throwError(
+"flatMapGroupsWithState in append mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case _ =>
   }
-case InternalOutputModes.Append =>
-  if (outputMode != InternalOutputModes.Append) {
-throwError("flatMapGroupsWithState in append mode is not 
supported with " +
-  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+} else {
+  // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
+  // *groupsWithState after aggregation not allowed
+  if (m.outputMode == InternalOutputModes.Update) {
+throwError(
+  "flatMapGroupsWithState in update mode is not supported 
with " +
+"aggregation on a streaming DataFrame/Dataset")
+  } else if (collectStreamingAggregates(m).nonEmpty) {
+throwError(
+  "flatMapGroupsWithState in append mode is not supported 
after " +
+s"aggregation on a streaming DataFrame/Dataset")
   }
+}
   }
 
-// flatMapGroupsWithState(Update) with aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && m.outputMode == InternalOutputModes.Update
-&& colle

[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107304893
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 ---
@@ -519,6 +588,52 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
 )
   }
 
+  test("flatMapGroupsWithState - streaming with event time timeout") {
+// Function to maintain the max event time
+// Returns the max event time in the state, or -1 if the state was 
removed by timeout
+val stateFunc = (
+key: String,
+values: Iterator[(String, Long)],
+state: KeyedState[Long]) => {
+  val timeoutDelay = 5
+  if (key != "a") {
+Iterator.empty
+  } else {
+if (state.hasTimedOut) {
+  state.remove()
+  Iterator((key, -1))
+} else {
+  val valuesSeq = values.toSeq
+  val maxEventTime = math.max(valuesSeq.map(_._2).max, 
state.getOption.getOrElse(0L))
+  val timeoutTimestampMs = maxEventTime + timeoutDelay
+  state.update(maxEventTime)
+  state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
+  Iterator((key, maxEventTime.toInt))
+}
+  }
+}
+val inputData = MemoryStream[(String, Int)]
+val result =
+  inputData.toDS
+.select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
+.withWatermark("eventTime", "10 seconds")
+.as[(String, Long)]
+.groupByKey[String]((x: (String, Long)) => x._1)
+.flatMapGroupsWithState[Long, (String, Int)](Update, 
EventTimeTimeout)(stateFunc)
--- End diff --

These types are just here for testing? (i.e. we didn't break inference 
right?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107304618
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
+
+  // Check compatibility with output modes and aggregations in 
query
+  val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
+
+  if (m.isMapGroupsWithState) {   // check 
mapGroupsWithState
+// allowed only in update query output mode and without 
aggregation
+if (aggsAfterFlatMapGroups.nonEmpty) {
+  throwError(
+"mapGroupsWithState is not supported with aggregation " +
+  "on a streaming DataFrame/Dataset")
+} else if (outputMode != InternalOutputModes.Update) {
+  throwError(
+"mapGroupsWithState is not supported with " +
   s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+}
+  } else {   // check 
latMapGroupsWithState
+if (aggsAfterFlatMapGroups.isEmpty) {
+  // flatMapGroupsWithState without aggregation: operation's 
output mode must
+  // match query output mode
+  m.outputMode match {
+case InternalOutputModes.Update if outputMode != 
InternalOutputModes.Update =>
+  throwError(
+"flatMapGroupsWithState in update mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case InternalOutputModes.Append if outputMode != 
InternalOutputModes.Append =>
+  throwError(
+"flatMapGroupsWithState in append mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case _ =>
   }
-case InternalOutputModes.Append =>
-  if (outputMode != InternalOutputModes.Append) {
-throwError("flatMapGroupsWithState in append mode is not 
supported with " +
-  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+} else {
+  // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
+  // *groupsWithState after aggregation not allowed
+  if (m.outputMode == InternalOutputModes.Update) {
+throwError(
+  "flatMapGroupsWithState in update mode is not supported 
with " +
+"aggregation on a streaming DataFrame/Dataset")
+  } else if (collectStreamingAggregates(m).nonEmpty) {
+throwError(
+  "flatMapGroupsWithState in append mode is not supported 
after " +
+s"aggregation on a streaming DataFrame/Dataset")
   }
+}
   }
 
-// flatMapGroupsWithState(Update) with aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && m.outputMode == InternalOutputModes.Update
-&& c

[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107304531
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
+
+  // Check compatibility with output modes and aggregations in 
query
+  val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
+
+  if (m.isMapGroupsWithState) {   // check 
mapGroupsWithState
+// allowed only in update query output mode and without 
aggregation
+if (aggsAfterFlatMapGroups.nonEmpty) {
+  throwError(
+"mapGroupsWithState is not supported with aggregation " +
+  "on a streaming DataFrame/Dataset")
+} else if (outputMode != InternalOutputModes.Update) {
+  throwError(
+"mapGroupsWithState is not supported with " +
   s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+}
+  } else {   // check 
latMapGroupsWithState
+if (aggsAfterFlatMapGroups.isEmpty) {
+  // flatMapGroupsWithState without aggregation: operation's 
output mode must
+  // match query output mode
+  m.outputMode match {
+case InternalOutputModes.Update if outputMode != 
InternalOutputModes.Update =>
+  throwError(
+"flatMapGroupsWithState in update mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case InternalOutputModes.Append if outputMode != 
InternalOutputModes.Append =>
+  throwError(
+"flatMapGroupsWithState in append mode is not 
supported with " +
+  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+
+case _ =>
   }
-case InternalOutputModes.Append =>
-  if (outputMode != InternalOutputModes.Append) {
-throwError("flatMapGroupsWithState in append mode is not 
supported with " +
-  s"$outputMode output mode on a streaming 
DataFrame/Dataset")
+} else {
+  // flatMapGroupsWithState with aggregation: update operation 
mode not allowed, and
+  // *groupsWithState after aggregation not allowed
+  if (m.outputMode == InternalOutputModes.Update) {
+throwError(
+  "flatMapGroupsWithState in update mode is not supported 
with " +
+"aggregation on a streaming DataFrame/Dataset")
+  } else if (collectStreamingAggregates(m).nonEmpty) {
+throwError(
+  "flatMapGroupsWithState in append mode is not supported 
after " +
+s"aggregation on a streaming DataFrame/Dataset")
   }
+}
   }
 
-// flatMapGroupsWithState(Update) with aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && m.outputMode == InternalOutputModes.Update
-&& c

[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107304196
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
   throwError("Commands like CreateTable*, AlterTable*, Show* are 
not supported with " +
 "streaming DataFrames/Datasets")
 
-// mapGroupsWithState: Allowed only when no aggregation + Update 
output mode
-case m: FlatMapGroupsWithState if m.isStreaming && 
m.isMapGroupsWithState =>
-  if (collectStreamingAggregates(plan).isEmpty) {
-if (outputMode != InternalOutputModes.Update) {
-  throwError("mapGroupsWithState is not supported with " +
-s"$outputMode output mode on a streaming 
DataFrame/Dataset")
-} else {
-  // Allowed when no aggregation + Update output mode
-}
-  } else {
-throwError("mapGroupsWithState is not supported with 
aggregation " +
-  "on a streaming DataFrame/Dataset")
-  }
-
-// flatMapGroupsWithState without aggregation
-case m: FlatMapGroupsWithState
-  if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
-  m.outputMode match {
-case InternalOutputModes.Update =>
-  if (outputMode != InternalOutputModes.Update) {
-throwError("flatMapGroupsWithState in update mode is not 
supported with " +
+// mapGroupsWithState and flatMapGroupsWithState
+case m: FlatMapGroupsWithState if m.isStreaming =>
--- End diff --

Wow, this is getting complicated...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

2017-03-21 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17361#discussion_r107304133
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
 ---
@@ -34,9 +32,20 @@
 @InterfaceStability.Evolving
 public class KeyedStateTimeout {
 
-  /** Timeout based on processing time.  */
+  /**
+   * Timeout based on processing time. The duration of timeout can be set 
for each group in
+   * `map/flatMapGroupsWithState` by calling 
`KeyedState.setTimeoutDuration()`.
+   */
   public static KeyedStateTimeout ProcessingTimeTimeout() { return 
ProcessingTimeTimeout$.MODULE$; }
--- End diff --

Nit: I'd consider removing the `Timeout` here and as its kind of redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org