Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89230294 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +71,30 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) + extends LeafExpression with Nondeterministic with CodegenFallback { + + override def nullable: Boolean = false + + override def prettyName: String = "current_batch_timestamp" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): Any = timestampMs + + def toLiteral: Literal = dataType match { + case _: TimestampType => Literal(timestampMs * 1000L, TimestampType) --- End diff -- nit: Use `Literal(DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(timestampMs)), TimestampType)` instead. This is not a big deal anyway since `timestampMs` is always positive here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org