[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89233925 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -92,8 +116,8 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ private var currentBatchId: Long = -1 - /** The current eventTime watermark, used to bound the lateness of data that will processed. */ - private var currentEventTimeWatermark: Long = 0 + /** stream execution metadata */ --- End diff -- nit: s -> S --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89230473 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -25,19 +25,43 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.hadoop.fs.Path +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.types.{DateType, TimestampType} import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** + * Contains metadata associated with a stream execution. This information is + * persisted to the offset log via the OffsetSeq metadata field. Current + * information contained in this object includes: + * + * 1. currentEventTimeWatermark: The current eventTime watermark, used to + * bound the lateness of data that will processed. Time unit: milliseconds + * 2. currentBatchTimestamp: The current batch processing timestamp. --- End diff -- nit: Use Scala doc: `@param currentEventTimeWatermarkMillis ...` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89230294 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +71,30 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) + extends LeafExpression with Nondeterministic with CodegenFallback { + + override def nullable: Boolean = false + + override def prettyName: String = "current_batch_timestamp" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): Any = timestampMs + + def toLiteral: Literal = dataType match { +case _: TimestampType => Literal(timestampMs * 1000L, TimestampType) --- End diff -- nit: Use `Literal(DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(timestampMs)), TimestampType)` instead. This is not a big deal anyway since `timestampMs` is always positive here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89230158 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +71,30 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) + extends LeafExpression with Nondeterministic with CodegenFallback { + + override def nullable: Boolean = false + + override def prettyName: String = "current_batch_timestamp" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): Any = timestampMs --- End diff -- Could you throw an exception like `throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89208121 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends LeafExpression --- End diff -- Both Nondeterministic and Unevaluable have final 'eval' methods, so both cannot be mixed in, and I need Nondeterministic to avoid optimizer pushdown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89025767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -344,8 +370,11 @@ class StreamExecution( } } if (hasNewData) { + // Current batch timestamp in seconds --- End diff -- or I would actually prefer using milliseconds all the time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89025593 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends LeafExpression --- End diff -- Feel free to push back on this... Since we're going to support both `CurrentTimestamp` and `CurrentDate`, I would recommend the following signature: ```scala case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) { def toLiteral: Literal = dataType match { case _: TimestampType => Literal(timestampMs * 1000, TimestampType) case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs), DateType) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89025088 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends LeafExpression --- End diff -- since this class will never be evaluated, but rewritten we can also extend `Unevaluable` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89022903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -38,6 +40,26 @@ import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** + * Contains metadata associated with a stream execution. This information is + * persisted to the offset log via the OffsetSeq metadata field. Current + * information contained in this object includes: + * + * 1. currentEventTimeWatermark: The current eventTime watermark, used to + * bound the lateness of data that will processed. + * 2. currentBatchTimestamp: The current batch processing timestamp + */ +case class StreamExecutionMetadata( +var currentEventTimeWatermark: Long = 0, +var currentBatchTimestamp: Long = 0) { --- End diff -- Ryan indicated we use SQLTimestamp to indicate microseconds. I changed the name of the watermark to currentEventTimeWatermarkMillis based on your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017887 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -344,8 +370,11 @@ class StreamExecution( } } if (hasNewData) { + // Current batch timestamp in seconds + streamExecutionMetadata.currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L --- End diff -- `triggerClock.getTimeMillis() / 1000L` -> `triggerClock.getTimeMillis() * 1000L`. SQLTimestamp uses microseconds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017675 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -38,6 +40,26 @@ import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** + * Contains metadata associated with a stream execution. This information is + * persisted to the offset log via the OffsetSeq metadata field. Current + * information contained in this object includes: + * + * 1. currentEventTimeWatermark: The current eventTime watermark, used to + * bound the lateness of data that will processed. + * 2. currentBatchTimestamp: The current batch processing timestamp + */ +case class StreamExecutionMetadata( +var currentEventTimeWatermark: Long = 0, +var currentBatchTimestamp: Long = 0) { --- End diff -- Shall we name this as `currentBatchTimestampSecs` to clarify that this is in seconds? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017673 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -288,7 +310,11 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) -logDebug(s"Found possibly uncommitted offsets $availableOffsets") +streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse( + throw new IllegalStateException("OffsetLog does not contain current batch timestamp!") --- End diff -- Adding {} doesn't yield code that compiles/typechecks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017665 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +72,28 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +@ExpressionDescription( + usage = "_FUNC_() - Returns the current timestamp at the start of batch evaluation.") +case class CurrentBatchTimestamp(timestamp: Long) extends LeafExpression --- End diff -- nit: `Long` -> `SQLTimestamp` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017544 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +451,8 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => --- End diff -- Shall we also support `CurrentDate`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017366 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +72,28 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +@ExpressionDescription( --- End diff -- nit: This class won't be user facing, won't have SQL APIs, etc, therefore we can remove this I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017238 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -288,7 +310,11 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) -logDebug(s"Found possibly uncommitted offsets $availableOffsets") +streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse( + throw new IllegalStateException("OffsetLog does not contain current batch timestamp!") --- End diff -- Instead of throwing an exception, we can just use `{}` for backward compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017095 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -734,6 +766,13 @@ class StreamExecution( case object TERMINATED extends State } +object StreamExecutionMetadata { + private implicit val formats = Serialization.formats(NoTypeHints) + + def apply(json: String): StreamExecutionMetadata = --- End diff -- Could you add some unit tests for this? E.g., ``` assert(StreamExecutionMetadata(0, 0) === StreamExecutionMetadata("""{}""")) assert(StreamExecutionMetadata(1, 0) === StreamExecutionMetadata("""{"currentEventTimeWatermark":1}""")) assert(StreamExecutionMetadata(0, 2) === StreamExecutionMetadata("""{"currentBatchTimestamp":2}""")) assert(StreamExecutionMetadata(1, 2) === StreamExecutionMetadata("""{"currentEventTimeWatermark":1,"currentBatchTimestamp":2}""")) ``` Then it's easy for us to find any incompatible changes in future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89011400 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -50,6 +52,19 @@ class IncrementalExecution( stateStrategy) /** + * See [SPARK-18339] + * Walk the optimized logical plan and replace CurrentBatchTimestamp + * with the desired literal + */ + override lazy val optimizedPlan: LogicalPlan = { +sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { --- End diff -- Never mind. Since we cannot convert expressions in StreamingRelationStrategy, it's fine to add codes here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88982854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -50,6 +52,19 @@ class IncrementalExecution( stateStrategy) /** + * See [SPARK-18339] + * Walk the optimized logical plan and replace CurrentBatchTimestamp + * with the desired literal + */ + override lazy val optimizedPlan: LogicalPlan = { +sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { --- End diff -- StreamingRelationStrategy converts LogicalPlan to SparkPlan. It will happen after creating optimizedPlan. So less risk. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88965945 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -344,8 +351,11 @@ class StreamExecution( } } if (hasNewData) { + // Current batch timestamp in seconds + currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { -assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources)), +assert(offsetLog.add(currentBatchId, + availableOffsets.toOffsetSeq(sources, Some(currentBatchTimestamp.toString))), --- End diff -- It's better to have a new class like `StreamExecutionMetadata` to store all fields that need to be checkpointed, e.g., `currentBatchTimestamp`, `currentEventTimeWatermark`, and others that we may add in future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88965885 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -50,6 +52,19 @@ class IncrementalExecution( stateStrategy) /** + * See [SPARK-18339] + * Walk the optimized logical plan and replace CurrentBatchTimestamp + * with the desired literal + */ + override lazy val optimizedPlan: LogicalPlan = { +sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { --- End diff -- Any reason to not define this in `org.apache.spark.sql.execution.SparkStrategies.StreamingRelationStrategy`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88942966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +432,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) --- End diff -- Nevermind, I just read the JIRA, and I think I understand what is going on. To me the `time` column in `df.groupBy('time)` is different from the `time` column which is (apparently) in the source. I personally think it is better to have the statefull aggregate produce a new time column, then to restrict push down. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88941686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +432,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) --- End diff -- @hvanhovell Timestamp is "non-deterministic" when it comes to streaming aggregations. Therefore it shouldn't be pushed down stateful aggregations. Otherwise the resulting table you get is different than what you asked for. Example: ``` df.groupBy('time).agg(count("*")).where('time >= current_timestamp().cast("long") - 30 * 60) ``` What you get: All aggregates from all of time, but the aggregates older than 30 minutes don't get updated. Those get filtered What I wanted: Just return me the last 30 minutes of aggregates. I don't care about anything else older. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88940421 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +432,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) --- End diff -- Why would that be a problem? I thought the problem was that the `current_timestamp` was set to the current system time, instead of the timestamp of the `current` batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88905225 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +432,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) --- End diff -- Good question. The purpose of the dedicated expression is to avoid having it pushed down below a stateful operator (e.g., aggregation). during optimization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88873003 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -95,6 +95,9 @@ class StreamExecution( /** The current eventTime watermark, used to bound the lateness of data that will processed. */ private var currentEventTimeWatermark: Long = 0 + /** The current batch processing timestamp */ + private var currentBatchTimestamp: Long = 0 --- End diff -- Do we need some form of locking here? Same goes for currentEventTimeWatermark BTW. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88872904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +432,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) --- End diff -- Maybe an incredibly stupid comment: Why not use a Literal instead of this dedicated expression? You are going to replace it with a Literal anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/15949 [Spark-18339] [SQL] Don't push down current_timestamp for filters in StructuredStreaming ## What changes were proposed in this pull request? For the following workflow: 1. I have a column called time which is at minute level precision in a Streaming DataFrame 2. I want to perform groupBy time, count 3. Then I want my MemorySink to only have the last 30 minutes of counts and I perform this by .where('time >= current_timestamp().cast("long") - 30 * 60) what happens is that the `filter` gets pushed down before the aggregation, and the filter happens on the source data for the aggregation instead of the result of the aggregation (where I actually want to filter). I guess the main issue here is that `current_timestamp` is non-deterministic in the streaming context and shouldn't be pushed down the filter. Does this require us to store the `current_timestamp` for each trigger of the streaming job, that is something to discuss. @brkyvz @zsxwing @tdas ## How was this patch tested? A test was added to StreamingAggregationSuite ensuring the above use case is handled. The test injects a stream of time values (in seconds) to a query that runs in complete mode and only outputs the (count) aggregation results for the past 10 seconds. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark SPARK-18339 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15949.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15949 commit 729ecedab4b61804a9717cadbd4f2c7b6aa50176 Author: Tyson Condie Date: 2016-11-18T23:26:25Z added CurrentBatchTimestamp commit b8a1f71bef2aa4c11c08178b2250bd995e952601 Author: Tyson Condie Date: 2016-11-18T23:35:27Z update comment commit 8f0a27329ea711d1936c2df11a310129e22eb9b5 Author: Tyson Condie Date: 2016-11-20T20:41:49Z add test for filtering time-based aggregation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org