[GitHub] spark pull request #15702: [SPARK-18124] Observed-delay based Even Time Wate...

2016-10-31 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r85859357
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java 
---
@@ -252,6 +252,10 @@ public static long parseSecondNano(String secondNano) 
throws IllegalArgumentExce
   public final int months;
   public final long microseconds;
 
+  public final long milliseconds() {
+  return this.microseconds / MICROS_PER_MILLI;
--- End diff --

2 space indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed-delay based Even Time Wate...

2016-10-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15702#discussion_r85845880
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.AccumulatorV2
+
+class MaxLong(protected var currentValue: Long = 0)
+  extends AccumulatorV2[Long, Long]
+  with Serializable {
+
+  override def isZero: Boolean = value == 0
+  override def value: Long = currentValue
+  override def copy(): AccumulatorV2[Long, Long] = new 
MaxLong(currentValue)
+
+  override def reset(): Unit = {
+currentValue = 0
+  }
+
+  override def add(v: Long): Unit = {
+if (value < v) { currentValue = v }
+  }
+
+  override def merge(other: AccumulatorV2[Long, Long]): Unit = {
+if (currentValue < other.value) {
+  currentValue = other.value
+}
+  }
+}
+
+/**
+ * Used to mark a column as the containing the event time for a given 
record. In addition to
+ * adding appropriate metadata to this column, this operator also tracks 
the maximum observed event
+ * time. Based on the maximum observed time and a user specified delay, we 
can calculate the
+ * `watermark` after which we assume we will no longer see late records 
for a particular time
+ * period.
+ */
+case class EventTimeWatermarkExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+child: SparkPlan) extends SparkPlan {
+
+  // TODO: Use Spark SQL Metrics?
+  val maxEventTime = new MaxLong
--- End diff --

@zsxwing am I doing this right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15702: [SPARK-18124] Observed-delay based Even Time Wate...

2016-10-31 Thread marmbrus
GitHub user marmbrus opened a pull request:

https://github.com/apache/spark/pull/15702

[SPARK-18124] Observed-delay based Even Time Watermarks

This PR adds a new method `withWatermark` to the `Dataset` API, which can 
be used specify an _event time watermark_.  An event time watermark allows the 
streaming engine to reason about the point in time after which we no longer 
expect to see late data.  This PR also has augmented `StreamExecution` to use 
this watermark for several purposes:
  - To know when a given time window aggregation is finalized and thus 
results can be emitted when using output modes that do not allow updates (e.g. 
`Append` mode).
  - To minimize the amount of state that we need to keep for on-going 
aggregations, by evicting state for groups that are no longer expected to 
change.  Note that we do still maintain all state if required (i.e. when in 
`Complete` mode).

An example that emits windowed counts of records, waiting up to 5 minutes 
for late data to arrive.
```scala
df.withWatermark($"eventTime", "5 mintues")
  .groupBy(window($"eventTime", "1 minute) as 'window)
  .count()
  .writeStream
  .format("console")
  .mode("append") // In append mode, we only output complete aggregations.
  .start()
```

### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen 
this epoch across all of the partitions in the query minus some user defined 
_delayThreshold_.  Note that since we must coordinate this value across 
partitions occasionally, the actual watermark used is only guaranteed to be at 
least `delay` behind the actual event time.  In some cases we may still process 
records that arrive more than delay late.

This mechanism was chosen for the initial implementation over processing 
time for two reasons:   
  - it is robust to downtime that could affect processing delay
  - it does not require syncing of time or timezones across

### Other notable implementation details
 - A new trigger metric `eventTimeWatermark` outputs the current value of 
the watermark.
 - We mark the event time column in the `Attribute` metadata using the key 
`spark.watermarkDelay`.  This allows downstream operations to know which column 
holds the event time.  Operations like `window` propagate this metadata.
 - `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease 
debugging of how this information is propagated.
 - Currently, we don't filter out late records, but instead rely on the 
state store to avoid emitting records that are both added and filtered in the 
same epoch.

### Remaining in this PR
 - [ ] The test for recovery is currently failing as we don't record the 
watermark used in the offset log.  We will need to do so to ensure determinism, 
but this is deferred until #15626 is merged.

### Other follow-ups
There are some natural additional features that we should consider for 
future work:
 - Ability to write records that arrive too late to some external store in 
case any out-of-band remediation is required.
 - `Update` mode so you can get partial results before a group is evicted.
 - Other mechanisms for calculating the watermark.  In particular a 
watermark based on quantiles would be more robust to outliers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/marmbrus/spark watermarks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15702.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15702


commit e6e3bbe9ca2d2081264b5bff68293572af7778a7
Author: Michael Armbrust 
Date:   2016-10-28T04:11:19Z

first test passing

commit 92320720492f192fe6791d0fea90495ea5db94a7
Author: Michael Armbrust 
Date:   2016-10-28T07:55:57Z

cleanup

commit 5b921323092c5730f816795193a6e0d985d7e430
Author: Michael Armbrust 
Date:   2016-10-31T22:00:32Z

Merge remote-tracking branch 'origin/master' into watermarks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org