[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15702


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-13 Thread amitsela
Github user amitsela commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r87712995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

Generally, updates should be able to take into account late arrivals (in 
respect to Watermark) and allow to act upon a user defined strategy, such as: 
`update for each following element`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86269248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+/** Tracks the maximum positive long seen. */
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long] {
+
+  override def isZero: Boolean = value == 0
+  override def value: Long = currentValue
+  override def copy(): AccumulatorV2[Long, Long] = new 
MaxLong(currentValue)
+
+  override def reset(): Unit = {
+currentValue = 0
+  }
+
+  override def add(v: Long): Unit = {
+if (value < v) { currentValue = v }
+  }
+
+  override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+if (currentValue < other.value) {
--- End diff --

nit: same as above, why not use math.max


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86268434
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -155,6 +155,16 @@ trait CheckAnalysis extends PredicateHelper {
 }
 
 operator match {
+  case etw: EventTimeWatermark =>
+etw.eventTime.dataType match {
+  case s: StructType
--- End diff --

Which high level case is caught by this condition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86269217
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+/** Tracks the maximum positive long seen. */
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long] {
+
+  override def isZero: Boolean = value == 0
+  override def value: Long = currentValue
+  override def copy(): AccumulatorV2[Long, Long] = new 
MaxLong(currentValue)
+
+  override def reset(): Unit = {
+currentValue = 0
+  }
+
+  override def add(v: Long): Unit = {
+if (value < v) { currentValue = v }
--- End diff --

nit: less confusing to read  if `if (currentValue < v) { currentValue = v 
}`. 
In fact why not used math.max?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86260124
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -155,6 +155,16 @@ trait CheckAnalysis extends PredicateHelper {
 }
 
 operator match {
+  case etw: EventTimeWatermark =>
+etw.eventTime.dataType match {
+  case s: StructType
+if s.find(_.name == 
"start").map(_.dataType).contains(TimestampType) =>
--- End diff --

Oh... it should also check the end of the window, not the start...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86259808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -155,6 +155,16 @@ trait CheckAnalysis extends PredicateHelper {
 }
 
 operator match {
+  case etw: EventTimeWatermark =>
+etw.eventTime.dataType match {
+  case s: StructType
+if s.find(_.name == 
"start").map(_.dataType).contains(TimestampType) =>
--- End diff --

nit: `Option.contains` is not in Scala 2.10.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86259192
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long]
+  with Serializable {
+
+  override def isZero: Boolean = value == 0
+  override def value: Long = currentValue
+  override def copy(): AccumulatorV2[Long, Long] = new 
MaxLong(currentValue)
+
+  override def reset(): Unit = {
+currentValue = 0
+  }
+
+  override def add(v: Long): Unit = {
+if (value < v) { currentValue = v }
+  }
+
+  override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+if (currentValue < other.value) {
+  currentValue = other.value
+}
+  }
+}
+
+/**
+ * Used to mark a column as the containing the event time for a given 
record. In addition to
+ * adding appropriate metadata to this column, this operator also tracks 
the maximum observed event
+ * time. Based on the maximum observed time and a user specified delay, we 
can calculate the
+ * `watermark` after which we assume we will no longer see late records 
for a particular time
+ * period.
+ */
+case class EventTimeWatermarkExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+child: SparkPlan) extends SparkPlan {
+
+  // TODO: Use Spark SQL Metrics?
+  val maxEventTime = new MaxLong
+  sparkContext.register(maxEventTime)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+child.execute().mapPartitions { iter =>
+  val getEventTime = UnsafeProjection.create(eventTime :: Nil, 
child.output)
+  iter.map { row =>
+maxEventTime.add(getEventTime(row).getLong(0))
--- End diff --

Added to checkAnalysis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86258503
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,6 +441,22 @@ class StreamExecution(
 sink.addBatch(currentBatchId, nextBatch)
 reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
+// Update the eventTime watermark if we find one in the plan.
+// TODO: Does this need to be an AttributeMap?
+lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec =>
+logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+(e.maxEventTime.value / 1000) - e.delay.milliseconds()
+}.headOption.foreach { newWatermark =>
+  if (newWatermark > currentEventTimeWatermark) {
+logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, 
newWatermark)
--- End diff --

I see, that makes sense.  I actually just moved it out so we only report if 
its non-zero.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86254543
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,6 +441,22 @@ class StreamExecution(
 sink.addBatch(currentBatchId, nextBatch)
 reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
+// Update the eventTime watermark if we find one in the plan.
+// TODO: Does this need to be an AttributeMap?
+lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec =>
+logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+(e.maxEventTime.value / 1000) - e.delay.milliseconds()
+}.headOption.foreach { newWatermark =>
+  if (newWatermark > currentEventTimeWatermark) {
+logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, 
newWatermark)
--- End diff --

I suggest just fixing it since it's pretty easy. Just `if (newWatermark == 
0) "-" else newWatermark.toString`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86252932
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,6 +441,22 @@ class StreamExecution(
 sink.addBatch(currentBatchId, nextBatch)
 reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
+// Update the eventTime watermark if we find one in the plan.
+// TODO: Does this need to be an AttributeMap?
+lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec =>
+logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+(e.maxEventTime.value / 1000) - e.delay.milliseconds()
+}.headOption.foreach { newWatermark =>
+  if (newWatermark > currentEventTimeWatermark) {
+logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, 
newWatermark)
--- End diff --

I think thats okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86249567
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -99,13 +99,16 @@ trait StateStoreProvider {
 
 
 /** Trait representing updates made to a [[StateStore]]. */
-sealed trait StoreUpdate
+sealed trait StoreUpdate {
+  def key: UnsafeRow
+  def value: UnsafeRow
+}
 
 case class ValueAdded(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
 
 case class ValueUpdated(key: UnsafeRow, value: UnsafeRow) extends 
StoreUpdate
 
-case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
+case class ValueRemoved(key: UnsafeRow, value: UnsafeRow) extends 
StoreUpdate
--- End diff --

It is used.  We need the value to emit the result upon eviction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86235177
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
+
+class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("error on bad column") {
+val inputData = MemoryStream[Int].toDF()
+val e = intercept[AnalysisException] {
+  inputData.withWatermark("badColumn", "1 minute")
+}
+assert(e.getMessage contains "badColumn")
+  }
+
+  test("watermark metric") {
+val inputData = MemoryStream[Int]
+
+val windowedAggregation = inputData.toDF()
+.withColumn("eventTime", $"value".cast("timestamp"))
+.withWatermark("eventTime", "10 seconds")
+.groupBy(window($"eventTime", "5 seconds") as 'window)
+.agg(count("*") as 'count)
+.select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+testStream(windowedAggregation)(
+  AddData(inputData, 15),
+  AssertOnLastQueryStatus { status =>
+status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === 
"5000"
+  },
+  AddData(inputData, 15),
+  AssertOnLastQueryStatus { status =>
+status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === 
"5000"
+  },
+  AddData(inputData, 25),
+  AssertOnLastQueryStatus { status =>
+status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === 
"15000"
+  }
+)
+  }
+
+  test("append-mode watermark aggregation") {
+val inputData = MemoryStream[Int]
+
+val windowedAggregation = inputData.toDF()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .groupBy(window($"eventTime", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+testStream(windowedAggregation)(
+  AddData(inputData, 10, 11, 12, 13, 14, 15),
+  CheckAnswer(),
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  CheckAnswer(),
+  AddData(inputData, 25), // Evict items less than previous watermark.
+  CheckAnswer((10, 5))
+)
+  }
+
+  ignore("recovery") {
+val inputData = MemoryStream[Int]
+
+val windowedAggregation = inputData.toDF()
+.withColumn("eventTime", $"value".cast("timestamp"))
+.withWatermark("eventTime", "10 seconds")
+.groupBy(window($"eventTime", "5 seconds") as 'window)
+.agg(count("*") as 'count)
+.select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+testStream(windowedAggregation)(
+  AddData(inputData, 10, 11, 12, 13, 14, 15),
+  CheckAnswer(),
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  StopStream,
+  StartStream(),
+  CheckAnswer(),
+  AddData(inputData, 25), // Evict items less than previous watermark.
+  StopStream,
+  StartStream(),
+  CheckAnswer((10, 5))
+)
+  }
+
+  test("dropping old data") {
+val inputData = MemoryStream[Int]
+
+val windowedAggregation = inputData.toDF()
+.withColumn("eventTime", $"value".cast("timestamp"))
+.withWatermark("eventTime", "10 

[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86231716
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -99,13 +99,16 @@ trait StateStoreProvider {
 
 
 /** Trait representing updates made to a [[StateStore]]. */
-sealed trait StoreUpdate
+sealed trait StoreUpdate {
+  def key: UnsafeRow
+  def value: UnsafeRow
+}
 
 case class ValueAdded(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
 
 case class ValueUpdated(key: UnsafeRow, value: UnsafeRow) extends 
StoreUpdate
 
-case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
+case class ValueRemoved(key: UnsafeRow, value: UnsafeRow) extends 
StoreUpdate
--- End diff --

Any special reason to change this? It seems weird that adding an unused 
field `value`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86230783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long]
+  with Serializable {
--- End diff --

nit: not needed. `AccumulatorV2` is already `Serializable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86233196
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+class MaxLong(protected var currentValue: Long = 0)
--- End diff --

nit: Could you document that this one only support positive longs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86230697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+class MaxLong(protected var currentValue: Long = 0)
--- End diff --

nit: `protected` -> `private`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86232610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,6 +441,22 @@ class StreamExecution(
 sink.addBatch(currentBatchId, nextBatch)
 reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
+// Update the eventTime watermark if we find one in the plan.
+// TODO: Does this need to be an AttributeMap?
+lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec =>
+logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+(e.maxEventTime.value / 1000) - e.delay.milliseconds()
+}.headOption.foreach { newWatermark =>
+  if (newWatermark > currentEventTimeWatermark) {
+logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, 
newWatermark)
+currentEventTimeWatermark = newWatermark
+  } else {
+logTrace(s"Event time didn't move: $newWatermark < 
$currentEventTimeWatermark")
--- End diff --

We need to call `streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, 
newWatermark)` here. Otherwise, the trigger details won't have 
`EVENT_TIME_WATERMARK` for this batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86232913
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,6 +441,22 @@ class StreamExecution(
 sink.addBatch(currentBatchId, nextBatch)
 reportNumRows(executedPlan, triggerLogicalPlan, newData)
 
+// Update the eventTime watermark if we find one in the plan.
+// TODO: Does this need to be an AttributeMap?
+lastExecution.executedPlan.collect {
+  case e: EventTimeWatermarkExec =>
+logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
+(e.maxEventTime.value / 1000) - e.delay.milliseconds()
+}.headOption.foreach { newWatermark =>
+  if (newWatermark > currentEventTimeWatermark) {
+logInfo(s"Updating eventTime watermark to: $newWatermark ms")
+streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, 
newWatermark)
--- End diff --

Is it fine to just set `EVENT_TIME_WATERMARK` to `0` if the first batch 
doesn't have any data (E.g., the filter specified by the user drops all data)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86234567
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long]
+  with Serializable {
+
+  override def isZero: Boolean = value == 0
+  override def value: Long = currentValue
+  override def copy(): AccumulatorV2[Long, Long] = new 
MaxLong(currentValue)
+
+  override def reset(): Unit = {
+currentValue = 0
+  }
+
+  override def add(v: Long): Unit = {
+if (value < v) { currentValue = v }
+  }
+
+  override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+if (currentValue < other.value) {
+  currentValue = other.value
+}
+  }
+}
+
+/**
+ * Used to mark a column as the containing the event time for a given 
record. In addition to
+ * adding appropriate metadata to this column, this operator also tracks 
the maximum observed event
+ * time. Based on the maximum observed time and a user specified delay, we 
can calculate the
+ * `watermark` after which we assume we will no longer see late records 
for a particular time
+ * period.
+ */
+case class EventTimeWatermarkExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+child: SparkPlan) extends SparkPlan {
+
+  // TODO: Use Spark SQL Metrics?
+  val maxEventTime = new MaxLong
+  sparkContext.register(maxEventTime)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+child.execute().mapPartitions { iter =>
+  val getEventTime = UnsafeProjection.create(eventTime :: Nil, 
child.output)
+  iter.map { row =>
+maxEventTime.add(getEventTime(row).getLong(0))
--- End diff --

Just a small question: which place will check the `eventTime` type? I guess 
`getLong` just throws an exception if the format is wrong. Can we fail it 
before starting the spark job?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86203860
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

That is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86203400
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
+   *
+   *  The current event time is computed by looking at the 
`MAX(eventTime)` seen in an epoch across
--- End diff --

Changed to watermark.  For epoch, I really just mean "during some period of 
time where we decide too coordinate across the partitions".  This happens at 
batch boundaries now, but that is not part of the contract we are promising.  I 
just removed that word to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86203412
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
+   *
+   *  The current event time is computed by looking at the 
`MAX(eventTime)` seen in an epoch across
+   *  all of the partitions in the query minus a user specified 
`delayThreshold`.  Due to the cost
+   *  of coordinating this value across partitions, the actual watermark 
used is only guaranteed
+   *  to be at least `delayThreshold` behind the actual event time.  In 
some cases we may still
+   *  process records that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime the name of the column that contains the event time 
of the row.
+   * @param delayThreshold the minimum delay to wait to data to arrive 
late, relative to the latest
+   *   record that has been processed in the form of 
an interval
+   *   (e.g. "1 minute" or "5 hours").
--- End diff --

That seems like more of an implementation detail, rather than a contract of 
the API.  The real contract is stated above as `the actual watermark used is 
only guaranteed to be at least 'delayThreshold' behind the actual event time`.  
There aren't really any bounds we can promise without knowing more about the 
query (even ms).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86203226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

Yes, I think it's a good idea to explicitly say for each output mode 
whether watermarks affect emit and evict.  Just so I'm clear, the intention is

Append: affects emit, affects evict
Update: doesn't affect emit, affects evict
Complete: doesn't affect emit, no eviction

Is that right?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86200392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

The only output modes that are supported publicly are `Complete` and 
`Append` (update is only available internally for tests).  When we add support 
for `Update` (I'd like to do this soon), it should also evict tuples which can 
no longer be updated due to their group falling beneath the watermark.  I 
thought that it was fairly clear that `Complete` would need to retain the 
complete set of aggregate state, but I'm happy to make this more explicit if 
others are confused by this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86075063
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

@koeninger, Update shall allow the late data to correct the previous 
results even they are late than the threshold, the similar implementation is in 
http://cdn.oreillystatic.com/en/assets/1/event/160/Triggers%20in%20Apache%20Beam%20_incubating_%20Presentation.pdf
 (search 'elementCountAtLeast')...correct me if I was wrong


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86066774
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
+   *
+   *  The current event time is computed by looking at the 
`MAX(eventTime)` seen in an epoch across
+   *  all of the partitions in the query minus a user specified 
`delayThreshold`.  Due to the cost
+   *  of coordinating this value across partitions, the actual watermark 
used is only guaranteed
+   *  to be at least `delayThreshold` behind the actual event time.  In 
some cases we may still
+   *  process records that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime the name of the column that contains the event time 
of the row.
+   * @param delayThreshold the minimum delay to wait to data to arrive 
late, relative to the latest
+   *   record that has been processed in the form of 
an interval
+   *   (e.g. "1 minute" or "5 hours").
--- End diff --

Should this make it clear what the minimum useful granularity is (ms)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86067616
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -104,85 +110,105 @@ case class StateStoreSaveExec(
 
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
-assert(returnAllStates.nonEmpty,
-  "Incorrect planning in IncrementalExecution, returnAllStates have 
not been set")
-val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ 
else saveAndReturnUpdated _
+assert(outputMode.nonEmpty,
+  "Incorrect planning in IncrementalExecution, outputMode has not been 
set")
+
 child.execute().mapPartitionsWithStateStore(
   getStateId.checkpointLocation,
   operatorId = getStateId.operatorId,
   storeVersion = getStateId.batchId,
   keyExpressions.toStructType,
   child.output.toStructType,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)
-)(saveAndReturnFunc)
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+val numOutputRows = longMetric("numOutputRows")
+val numTotalStateRows = longMetric("numTotalStateRows")
+val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+outputMode match {
+  // Update and output all rows in the StateStore.
+  case Some(Complete) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+store.commit()
+numTotalStateRows += store.numKeys()
+store.iterator().map { case (k, v) =>
+  numOutputRows += 1
+  v.asInstanceOf[InternalRow]
+}
+
+  // Update and output only rows being evicted from the StateStore
+  case Some(Append) =>
+while (iter.hasNext) {
+  val row = iter.next().asInstanceOf[UnsafeRow]
+  val key = getKey(row)
+  store.put(key.copy(), row.copy())
+  numUpdatedStateRows += 1
+}
+
+val watermarkAttribute =
+  
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
+// If we are evicting based on a window, use the end of the 
window.  Otherwise just
+// use the attribute itself.
+val evictionExpression =
+  if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+LessThanOrEqual(
+  GetStructField(watermarkAttribute, 1),
+  Literal(eventTimeWatermark.get * 1000))
+  } else {
+LessThanOrEqual(
+  watermarkAttribute,
+  Literal(eventTimeWatermark.get * 1000))
+  }
+
+logInfo(s"Filtering state store on: $evictionExpression")
+val predicate = newPredicate(evictionExpression, 
keyExpressions)
+store.remove(predicate)
+
+store.commit()
+
+numTotalStateRows += store.numKeys()
+store.updates().filter(_.isInstanceOf[ValueRemoved]).map { 
removed =>
+  numOutputRows += 1
+  removed.value.asInstanceOf[InternalRow]
+}
+
+  // Update and output modified rows from the StateStore.
+  case Some(Update) =>
--- End diff --

I'm not clear on why the semantics of Update mean that watermarks shouldn't 
be used to remove state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86066376
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
--- End diff --

For append, this sounds like the intention is emit only once watermark has 
passed, and drop state.
But for other output modes, it's not clear from reading this what the 
effect of the watermark on emission and dropping state is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86066082
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,41 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Defines an event time watermark for this [[Dataset]]. A watermark 
tracks a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes:
+   *  - To know when a given time window aggregation can be finalized and 
thus can be emitted when
+   *using output modes that do not allow updates.
+   *  - To minimize the amount of state that we need to keep for on-going 
aggregations.
+   *
+   *  The current event time is computed by looking at the 
`MAX(eventTime)` seen in an epoch across
--- End diff --

- Should this be "The current watermark is computed..." ?
- what is an epoch, it isn't mentioned in the docs or elsewhere in the PR



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed delay based Event Time Wat...

2016-11-01 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r86053925
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+
+object EventTimeWatermark {
+  /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the 
eventTime watermark delay. */
+  val delayKey = "spark.watermarkDelay"
+}
+
+/**
+ * Used to mark a user specified column as holding the event time for a 
row.
+ */
+case class EventTimeWatermark(
+eventTime: Attribute,
+delay: CalendarInterval,
+child: LogicalPlan) extends LogicalPlan {
+
+  // Update the metadata on the eventTime column to include the desired 
delay.
+  override val output: Seq[Attribute] = child.output.map { a =>
+if (a semanticEquals eventTime) {
+  val updatedMetadata = new MetadataBuilder()
+.withMetadata(a.metadata)
+.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
--- End diff --

Updating the key to include `Ms`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed-delay based Event Time Wat...

2016-10-31 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r85859683
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -536,6 +535,37 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * Defines an event time watermark for this [[Dataset]]. This watermark 
tracks a point in time
--- End diff --

need a tag here for experimental


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org