[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89233925
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -92,8 +116,8 @@ class StreamExecution(
   /** The current batchId or -1 if execution has not yet been initialized. 
*/
   private var currentBatchId: Long = -1
 
-  /** The current eventTime watermark, used to bound the lateness of data 
that will processed. */
-  private var currentEventTimeWatermark: Long = 0
+  /** stream execution metadata */
--- End diff --

nit: s -> S


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89230473
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -25,19 +25,43 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.{DateType, TimestampType}
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /**
+ * Contains metadata associated with a stream execution. This information 
is
+ * persisted to the offset log via the OffsetSeq metadata field. Current
+ * information contained in this object includes:
+ *
+ * 1. currentEventTimeWatermark: The current eventTime watermark, used to
+ * bound the lateness of data that will processed. Time unit: milliseconds
+ * 2. currentBatchTimestamp: The current batch processing timestamp.
--- End diff --

nit: Use Scala doc: `@param currentEventTimeWatermarkMillis  ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89230294
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +71,30 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType)
+  extends LeafExpression with Nondeterministic with CodegenFallback {
+
+  override def nullable: Boolean = false
+
+  override def prettyName: String = "current_batch_timestamp"
+
+  override protected def initializeInternal(partitionIndex: Int): Unit = {}
+
+  override protected def evalInternal(input: InternalRow): Any = 
timestampMs
+
+  def toLiteral: Literal = dataType match {
+case _: TimestampType => Literal(timestampMs * 1000L, TimestampType)
--- End diff --

nit: Use `Literal(DateTimeUtils.fromJavaTimestamp(new 
java.sql.Timestamp(timestampMs)), TimestampType)` instead. This is not a big 
deal anyway since `timestampMs` is always positive here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89230158
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +71,30 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType)
+  extends LeafExpression with Nondeterministic with CodegenFallback {
+
+  override def nullable: Boolean = false
+
+  override def prettyName: String = "current_batch_timestamp"
+
+  override protected def initializeInternal(partitionIndex: Int): Unit = {}
+
+  override protected def evalInternal(input: InternalRow): Any = 
timestampMs
--- End diff --

Could you throw an exception like `throw new 
UnsupportedOperationException(s"Cannot evaluate expression: $this")` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-22 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89208121
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends 
LeafExpression
--- End diff --

Both Nondeterministic and Unevaluable have final 'eval' methods, so both 
cannot be mixed in, and I need Nondeterministic to avoid optimizer pushdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89025767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -344,8 +370,11 @@ class StreamExecution(
   }
 }
 if (hasNewData) {
+  // Current batch timestamp in seconds
--- End diff --

or I would actually prefer using milliseconds all the time


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89025593
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends 
LeafExpression
--- End diff --

Feel free to push back on this...
Since we're going to support both `CurrentTimestamp` and `CurrentDate`, I 
would recommend the following signature:
```scala
case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) {

  def toLiteral: Literal = dataType match {
case _: TimestampType => Literal(timestampMs * 1000, TimestampType)
case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs), 
DateType)
  }
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89025088
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends 
LeafExpression
--- End diff --

since this class will never be evaluated, but rewritten we can also extend 
`Unevaluable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89022903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -38,6 +40,26 @@ import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /**
+ * Contains metadata associated with a stream execution. This information 
is
+ * persisted to the offset log via the OffsetSeq metadata field. Current
+ * information contained in this object includes:
+ *
+ * 1. currentEventTimeWatermark: The current eventTime watermark, used to
+ * bound the lateness of data that will processed.
+ * 2. currentBatchTimestamp: The current batch processing timestamp
+ */
+case class StreamExecutionMetadata(
+var currentEventTimeWatermark: Long = 0,
+var currentBatchTimestamp: Long = 0) {
--- End diff --

Ryan indicated we use SQLTimestamp to indicate microseconds. I changed the 
name of the watermark to currentEventTimeWatermarkMillis based on your 
suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017887
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -344,8 +370,11 @@ class StreamExecution(
   }
 }
 if (hasNewData) {
+  // Current batch timestamp in seconds
+  streamExecutionMetadata.currentBatchTimestamp = 
triggerClock.getTimeMillis() / 1000L
--- End diff --

`triggerClock.getTimeMillis() / 1000L` -> `triggerClock.getTimeMillis() * 
1000L`. SQLTimestamp uses microseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -38,6 +40,26 @@ import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /**
+ * Contains metadata associated with a stream execution. This information 
is
+ * persisted to the offset log via the OffsetSeq metadata field. Current
+ * information contained in this object includes:
+ *
+ * 1. currentEventTimeWatermark: The current eventTime watermark, used to
+ * bound the lateness of data that will processed.
+ * 2. currentBatchTimestamp: The current batch processing timestamp
+ */
+case class StreamExecutionMetadata(
+var currentEventTimeWatermark: Long = 0,
+var currentBatchTimestamp: Long = 0) {
--- End diff --

Shall we name this as `currentBatchTimestampSecs` to clarify that this is 
in seconds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017673
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -288,7 +310,11 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+streamExecutionMetadata = 
StreamExecutionMetadata(nextOffsets.metadata.getOrElse(
+  throw new IllegalStateException("OffsetLog does not contain 
current batch timestamp!")
--- End diff --

Adding {} doesn't yield code that compiles/typechecks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017665
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +72,28 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_() - Returns the current timestamp at the start of batch 
evaluation.")
+case class CurrentBatchTimestamp(timestamp: Long) extends LeafExpression
--- End diff --

nit: `Long` -> `SQLTimestamp`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017544
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +451,8 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp =>
--- End diff --

Shall we also support `CurrentDate`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017366
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +72,28 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+@ExpressionDescription(
--- End diff --

nit: This class won't be user facing, won't have SQL APIs, etc, therefore 
we can remove this I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017238
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -288,7 +310,11 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+streamExecutionMetadata = 
StreamExecutionMetadata(nextOffsets.metadata.getOrElse(
+  throw new IllegalStateException("OffsetLog does not contain 
current batch timestamp!")
--- End diff --

Instead of throwing an exception, we can just use `{}` for backward 
compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017095
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -734,6 +766,13 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
+object StreamExecutionMetadata {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  def apply(json: String): StreamExecutionMetadata =
--- End diff --

Could you add some unit tests for this? E.g.,
```
assert(StreamExecutionMetadata(0, 0) ===
  StreamExecutionMetadata("""{}"""))
assert(StreamExecutionMetadata(1, 0) ===
  StreamExecutionMetadata("""{"currentEventTimeWatermark":1}"""))
assert(StreamExecutionMetadata(0, 2) ===
  StreamExecutionMetadata("""{"currentBatchTimestamp":2}"""))
assert(StreamExecutionMetadata(1, 2) ===
  
StreamExecutionMetadata("""{"currentEventTimeWatermark":1,"currentBatchTimestamp":2}"""))
```
Then it's easy for us to find any incompatible changes in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89011400
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -50,6 +52,19 @@ class IncrementalExecution(
   stateStrategy)
 
   /**
+   * See [SPARK-18339]
+   * Walk the optimized logical plan and replace CurrentBatchTimestamp
+   * with the desired literal
+   */
+  override lazy val optimizedPlan: LogicalPlan = {
+sparkSession.sessionState.optimizer.execute(withCachedData) 
transformAllExpressions {
--- End diff --

Never mind. Since we cannot convert expressions in 
StreamingRelationStrategy, it's fine to add codes here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88982854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -50,6 +52,19 @@ class IncrementalExecution(
   stateStrategy)
 
   /**
+   * See [SPARK-18339]
+   * Walk the optimized logical plan and replace CurrentBatchTimestamp
+   * with the desired literal
+   */
+  override lazy val optimizedPlan: LogicalPlan = {
+sparkSession.sessionState.optimizer.execute(withCachedData) 
transformAllExpressions {
--- End diff --

StreamingRelationStrategy converts LogicalPlan to SparkPlan. It will happen 
after creating optimizedPlan. So less risk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88965945
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -344,8 +351,11 @@ class StreamExecution(
   }
 }
 if (hasNewData) {
+  // Current batch timestamp in seconds
+  currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L
   reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-assert(offsetLog.add(currentBatchId, 
availableOffsets.toOffsetSeq(sources)),
+assert(offsetLog.add(currentBatchId,
+  availableOffsets.toOffsetSeq(sources, 
Some(currentBatchTimestamp.toString))),
--- End diff --

It's better to have a new class like `StreamExecutionMetadata` to store all 
fields that need to be checkpointed, e.g., `currentBatchTimestamp`, 
`currentEventTimeWatermark`, and others that we may add in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88965885
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -50,6 +52,19 @@ class IncrementalExecution(
   stateStrategy)
 
   /**
+   * See [SPARK-18339]
+   * Walk the optimized logical plan and replace CurrentBatchTimestamp
+   * with the desired literal
+   */
+  override lazy val optimizedPlan: LogicalPlan = {
+sparkSession.sessionState.optimizer.execute(withCachedData) 
transformAllExpressions {
--- End diff --

Any reason to not define this in 
`org.apache.spark.sql.execution.SparkStrategies.StreamingRelationStrategy`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88942966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +432,7 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp => 
CurrentBatchTimestamp(currentBatchTimestamp)
--- End diff --

Nevermind, I just read the JIRA, and I think I understand what is going on.

To me the `time` column in `df.groupBy('time)` is different from the `time` 
column which is  (apparently) in the source. I personally think it is better to 
have the statefull aggregate produce a new time column, then to restrict push 
down.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88941686
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +432,7 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp => 
CurrentBatchTimestamp(currentBatchTimestamp)
--- End diff --

@hvanhovell Timestamp is "non-deterministic" when it comes to streaming 
aggregations. Therefore it shouldn't be pushed down stateful aggregations. 
Otherwise the resulting table you get is different than what you asked for.
Example: 
```
df.groupBy('time).agg(count("*")).where('time >= 
current_timestamp().cast("long") - 30 * 60)
```
What you get:
All aggregates from all of time, but the aggregates older than 30 minutes 
don't get updated. Those get filtered
What I wanted:
Just return me the last 30 minutes of aggregates. I don't care about 
anything else older.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88940421
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +432,7 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp => 
CurrentBatchTimestamp(currentBatchTimestamp)
--- End diff --

Why would that be a problem? I thought the problem was that the 
`current_timestamp` was set to the current system time, instead of the 
timestamp of the `current` batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88905225
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +432,7 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp => 
CurrentBatchTimestamp(currentBatchTimestamp)
--- End diff --

Good question. The purpose of the dedicated expression is to avoid having 
it pushed down below a stateful operator (e.g., aggregation). during 
optimization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...

2016-11-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88873003
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -95,6 +95,9 @@ class StreamExecution(
   /** The current eventTime watermark, used to bound the lateness of data 
that will processed. */
   private var currentEventTimeWatermark: Long = 0
 
+  /** The current batch processing timestamp */
+  private var currentBatchTimestamp: Long = 0
--- End diff --

Do we need some form of locking here? Same goes for 
currentEventTimeWatermark BTW.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...

2016-11-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88872904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +432,7 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp => 
CurrentBatchTimestamp(currentBatchTimestamp)
--- End diff --

Maybe an incredibly stupid comment: Why not use a Literal instead of this 
dedicated expression? You are going to replace it with a Literal anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...

2016-11-20 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/15949

[Spark-18339] [SQL] Don't push down current_timestamp for filters in 
StructuredStreaming

## What changes were proposed in this pull request?

For the following workflow:
1. I have a column called time which is at minute level precision in a 
Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and 
I perform this by
.where('time >= current_timestamp().cast("long") - 30 * 60)
what happens is that the `filter` gets pushed down before the aggregation, 
and the filter happens on the source data for the aggregation instead of the 
result of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is 
non-deterministic in the streaming context and shouldn't be pushed down the 
filter.
Does this require us to store the `current_timestamp` for each trigger of 
the streaming job, that is something to discuss.

@brkyvz @zsxwing @tdas 

## How was this patch tested?

A test was added to StreamingAggregationSuite ensuring the above use case 
is handled. The test injects a stream of time values (in seconds) to a query 
that runs in complete mode and only outputs the (count) aggregation results for 
the past 10 seconds. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark SPARK-18339

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15949


commit 729ecedab4b61804a9717cadbd4f2c7b6aa50176
Author: Tyson Condie 
Date:   2016-11-18T23:26:25Z

added CurrentBatchTimestamp

commit b8a1f71bef2aa4c11c08178b2250bd995e952601
Author: Tyson Condie 
Date:   2016-11-18T23:35:27Z

update comment

commit 8f0a27329ea711d1936c2df11a310129e22eb9b5
Author: Tyson Condie 
Date:   2016-11-20T20:41:49Z

add test for filtering time-based aggregation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org