[spark] branch master updated: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0745333 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor 0745333 is described below commit 07454d01afdd7862a1ac6c5a7a672bcce3f8 Author: chakravarthiT <45845595+chakravart...@users.noreply.github.com> AuthorDate: Thu Apr 11 10:02:27 2019 +0900 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor ## What changes were proposed in this pull request? Similar to #22406 , which has made log level for plan changes by each rule configurable ,this PR is to make log level for plan changes by each batch configurable,and I have reused the same configuration: "spark.sql.optimizer.planChangeLog.level". Config proposed in this PR , spark.sql.optimizer.planChangeLog.batches - enable plan change logging only for a set of specified batches, separated by commas. ## How was this patch tested? Added UT , also tested manually and attached screenshots below. 1)Setting spark.sql.optimizer.planChangeLog.leve to warn. ![settingLogLevelToWarn](https://user-images.githubusercontent.com/45845595/54556730-8803dd00-49df-11e9-95ab-ebb0c8d735ef.png) 2)setting spark.sql.optimizer.planChangeLog.batches to Resolution and Subquery. ![settingBatchestoLog](https://user-images.githubusercontent.com/45845595/54556740-8cc89100-49df-11e9-80ab-fbbbe1ff2cdf.png) 3) plan change logging enabled only for a set of specified batches(Resolution and Subquery) ![batchloggingOp](https://user-images.githubusercontent.com/45845595/54556788-ab2e8c80-49df-11e9-9ae0-57815f552896.png) Closes #24136 from chakravarthiT/logBatches. Lead-authored-by: chakravarthiT <45845595+chakravart...@users.noreply.github.com> Co-authored-by: chakravarthiT Signed-off-by: HyukjinKwon --- .../spark/sql/catalyst/rules/RuleExecutor.scala| 55 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 18 +-- .../catalyst/optimizer/OptimizerLoggingSuite.scala | 45 +++--- 3 files changed, 87 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 088f1fe..3e8a6e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (effective) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) - planChangeLogger.log(rule.ruleName, plan, result) + planChangeLogger.logRule(rule.ruleName, plan, result) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) @@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { lastPlan = curPlan } - if (!batchStartPlan.fastEquals(curPlan)) { -logDebug( - s""" -|=== Result of Batch ${batch.name} === -|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} - """.stripMargin) - } else { -logTrace(s"Batch ${batch.name} has no effect.") - } + planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } curPlan @@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq) -def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { +private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq) + +def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { if (logRules.isEmpty || logRules.get.contains(ruleName)) { -lazy val message = +def message(): String = { s""" |=== Applying Rule ${ruleName} === |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")} """.stripMargin -logLevel match { - case "TRACE" => logTrace(message) - case "DEBUG" => logDebug(message) - case "INFO" => logInfo(message) - case "WARN" => logWarning(message) - case "ERROR" => logError(message) - case _ => logTrace(message) } + +
[spark] branch master updated: [MINOR][SQL] Unnecessary access to externalCatalog
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 181d190 [MINOR][SQL] Unnecessary access to externalCatalog 181d190 is described below commit 181d190c606ec6cbd09f6d618347b60eaa4ea828 Author: ocaballero AuthorDate: Thu Apr 11 10:00:09 2019 +0900 [MINOR][SQL] Unnecessary access to externalCatalog Necessarily access the external catalog without having to do it ## What changes were proposed in this pull request? The existsFunction function has been changed because it unnecessarily accessed the externalCatalog to find if the database exists in cases where the function is in the functionRegistry ## How was this patch tested? It has been tested through spark-shell and accessing the metastore logs of hive. Inside spark-shell we use spark.table (% tableA%). SelectExpr ("trim (% columnA%)") in the current version and it appears every time: org.apache.hadoop.hive.metastore.HiveMetaStore.audit: cmd = get_database: default Once the change is made, no record appears Closes #24312 from OCaballero/master. Authored-by: ocaballero Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4b862a5..c05f777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1105,10 +1105,11 @@ class SessionCatalog( * Check if the function with the specified name exists */ def functionExists(name: FunctionIdentifier): Boolean = { -val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) -requireDbExists(db) -functionRegistry.functionExists(name) || +functionRegistry.functionExists(name) || { + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) externalCatalog.functionExists(db, name.funcName) +} } // - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate (backport 2.4)
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new a8a2ba1 [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate (backport 2.4) a8a2ba1 is described below commit a8a2ba11ac10051423e58920062b50f328b06421 Author: Shixiong Zhu AuthorDate: Wed Apr 10 15:17:04 2019 -0700 [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate (backport 2.4) ## What changes were proposed in this pull request? This PR backports #24303 to 2.4. ## How was this patch tested? Jenkins Closes #24328 from zsxwing/SPARK-27394-2.4. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../apache/spark/status/AppStatusListener.scala| 40 -- .../scala/org/apache/spark/status/config.scala | 6 .../org/apache/spark/ui/UISeleniumSuite.scala | 35 +-- docs/configuration.md | 8 + 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c4dd47d..cb7ab7f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -58,6 +58,12 @@ private[spark] class AppStatusListener( // operations that we can live without when rapidly processing incoming task events. private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + /** + * Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming + * task events are not fired frequently. + */ + private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD) + private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) @@ -73,6 +79,9 @@ private[spark] class AppStatusListener( // around liveExecutors. @volatile private var activeExecutorCount = 0 + /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ + private var lastFlushTimeNs = System.nanoTime() + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) { count => cleanupExecutors(count) } @@ -86,7 +95,8 @@ private[spark] class AppStatusListener( kvstore.onFlush { if (!live) { - flush() + val now = System.nanoTime() + flush(update(_, now)) } } @@ -744,6 +754,15 @@ private[spark] class AppStatusListener( } } } + +// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush +// here to ensure the staleness of Spark UI doesn't last more than +// `max(heartbeat interval, liveUpdateMinFlushPeriod)`. +if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) { + flush(maybeUpdate(_, now)) + // Re-get the current system time because `flush` may be slow and `now` is stale. + lastFlushTimeNs = System.nanoTime() +} } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { @@ -755,18 +774,17 @@ private[spark] class AppStatusListener( } } - /** Flush all live entities' data to the underlying store. */ - private def flush(): Unit = { -val now = System.nanoTime() + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ + private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { liveStages.values.asScala.foreach { stage => - update(stage, now) - stage.executorSummaries.values.foreach(update(_, now)) + entityFlushFunc(stage) + stage.executorSummaries.values.foreach(entityFlushFunc) } -liveJobs.values.foreach(update(_, now)) -liveExecutors.values.foreach(update(_, now)) -liveTasks.values.foreach(update(_, now)) -liveRDDs.values.foreach(update(_, now)) -pools.values.foreach(update(_, now)) +liveJobs.values.foreach(entityFlushFunc) +liveExecutors.values.foreach(entityFlushFunc) +liveTasks.values.foreach(entityFlushFunc) +liveRDDs.values.foreach(entityFlushFunc) +pools.values.foreach(entityFlushFunc) } /** diff --git a/core/src/main/scala/org/apache/spark/status/config.scala b/core/src/main/scala/org/apache/spark/status/config.scala index 67801b8..87204fd 100644 --- a/core/src/main/scala/org/apache/spark/status/config.scala +++ b/core/src/main/scala/org/apache/spark/status/config.scala @@ -31,6 +31,12 @@ private[spark] object config { .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("100ms") + val
[spark] branch master updated: [SPARK-27423][SQL] Cast DATE <-> TIMESTAMP according to the SQL standard
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ab8710b [SPARK-27423][SQL] Cast DATE <-> TIMESTAMP according to the SQL standard ab8710b is described below commit ab8710b57916a129fcb89464209361120d224535 Author: Maxim Gekk AuthorDate: Wed Apr 10 22:41:19 2019 +0800 [SPARK-27423][SQL] Cast DATE <-> TIMESTAMP according to the SQL standard ## What changes were proposed in this pull request? According to SQL standard, value of `DATE` type is union of year, month, dayInMonth, and it is independent from any time zones. To convert it to Catalyst's `TIMESTAMP`, `DATE` value should be "extended" by the time at midnight - `00:00:00`. The resulted local date+time should be considered as a timestamp in the session time zone, and casted to microseconds since epoch in `UTC` accordingly. The reverse casting from `TIMESTAMP` to `DATE` should be performed in the similar way. `TIMESTAMP` values should be represented as a local date+time in the session time zone. And the time component should be just removed. For example, `TIMESTAMP 2019-04-10 00:10:12` -> `DATE 2019-04-10`. The resulted date is converted to days since epoch `1970-01-01`. ## How was this patch tested? The changes were tested by existing test suites - `DateFunctionsSuite`, `DateExpressionsSuite` and `CastSuite`. Closes #24332 from MaxGekk/cast-timestamp-to-date2. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Wenchen Fan --- .../apache/spark/sql/catalyst/expressions/Cast.scala | 19 +++ .../spark/sql/catalyst/util/DateTimeUtils.scala | 11 +++ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 848195f..f7bc8b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.math.{BigDecimal => JavaBigDecimal} +import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkException @@ -381,7 +382,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String case ByteType => buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => MILLISECONDS.toMicros(DateTimeUtils.daysToMillis(d, timeZone))) + buildCast[Int](_, d => epochDaysToMicros(d, zoneId)) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -418,7 +419,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Long](_, t => DateTimeUtils.millisToDays(MICROSECONDS.toMillis(t), timeZone)) + buildCast[Long](_, t => microsToEpochDays(t, zoneId)) } // IntervalConverter @@ -935,11 +936,12 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } """ case TimestampType => - val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) + val zid = JavaCode.global( +ctx.addReferenceObj("zoneId", zoneId, "java.time.ZoneId"), +zoneId.getClass) (c, evPrim, evNull) => code"""$evPrim = - org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays( -$c / $MICROS_PER_MILLIS, $tz);""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToEpochDays($c, $zid);""" case _ => (c, evPrim, evNull) => code"$evNull = true;" } @@ -1043,11 +1045,12 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String case _: IntegralType => (c, evPrim, evNull) => code"$evPrim = ${longToTimeStampCode(c)};" case DateType => - val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) + val zid = JavaCode.global( +ctx.addReferenceObj("zoneId", zoneId, "java.time.ZoneId"), +zoneId.getClass) (c, evPrim, evNull) => code"""$evPrim = - org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis( -$c, $tz) * $MICROS_PER_MILLIS;""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.epochDaysToMicros($c, $zid);""" case DecimalType() => (c, evPrim,
[spark] branch master updated: [SPARK-27422][SQL] current_date() should return current date in the session time zone
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1470f23 [SPARK-27422][SQL] current_date() should return current date in the session time zone 1470f23 is described below commit 1470f23ec93f13d6c847832c06d40f8fc9803129 Author: Maxim Gekk AuthorDate: Wed Apr 10 21:54:50 2019 +0800 [SPARK-27422][SQL] current_date() should return current date in the session time zone ## What changes were proposed in this pull request? In the PR, I propose to revert 2 commits https://github.com/apache/spark/commit/06abd06112965cd73417ccceacdbd94b6b3d2793 and https://github.com/apache/spark/commit/61561c1c2d4e47191fdfe9bf3539a3db29e89fa9, and take current date via `LocalDate.now` in the session time zone. The result is stored as days since epoch `1970-01-01`. ## How was this patch tested? It was tested by `DateExpressionsSuite`, `DateFunctionsSuite`, `DateTimeUtilsSuite`, and `ComputeCurrentTimeSuite`. Closes #24330 from MaxGekk/current-date2. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Wenchen Fan --- docs/sql-migration-guide-upgrade.md| 2 -- .../catalyst/expressions/datetimeExpressions.scala | 18 ++-- .../sql/catalyst/optimizer/finishAnalysis.scala| 25 +++--- .../expressions/DateExpressionsSuite.scala | 6 +++--- .../optimizer/ComputeCurrentTimeSuite.scala| 7 +++--- .../execution/streaming/MicroBatchExecution.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 2 +- .../apache/spark/sql/DataFrameAggregateSuite.scala | 2 +- .../org/apache/spark/sql/DateFunctionsSuite.scala | 18 +++- 9 files changed, 40 insertions(+), 42 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 741c510..b193522 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -120,8 +120,6 @@ license: | - In Spark version 2.4 and earlier, when reading a Hive Serde table with Spark native data sources(parquet/orc), Spark will infer the actual file schema and update the table schema in metastore. Since Spark 3.0, Spark doesn't infer the schema anymore. This should not cause any problems to end users, but if it does, please set `spark.sql.hive.caseSensitiveInferenceMode` to `INFER_AND_SAVE`. - - In Spark version 2.4 and earlier, the `current_date` function returns the current date shifted according to the SQL config `spark.sql.session.timeZone`. Since Spark 3.0, the function always returns the current date in the `UTC` time zone. - - Since Spark 3.0, `TIMESTAMP` literals are converted to strings using the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion uses the default time zone of the Java virtual machine. - In Spark version 2.4, when a spark session is created via `cloneSession()`, the newly created spark session inherits its configuration from its parent `SparkContext` even though the same configuration may exist with a different value in its parent spark session. Since Spark 3.0, the configurations of a parent `SparkSession` have a higher precedence over the parent `SparkContext`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 5fb0b85..aad9f20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import java.time.{Instant, LocalDate, ZoneId, ZoneOffset} +import java.time.{Instant, LocalDate, ZoneId} import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} @@ -54,26 +54,30 @@ trait TimeZoneAwareExpression extends Expression { @transient lazy val zoneId: ZoneId = DateTimeUtils.getZoneId(timeZoneId.get) } -// scalastyle:off line.size.limit /** - * Returns the current date in the UTC time zone at the start of query evaluation. + * Returns the current date at the start of query evaluation. * All calls of current_date within the same query return the same value. * * There is no code generation since this expression should get constant folded by the optimizer. */ @ExpressionDescription( - usage = "_FUNC_() - Returns the current date in the UTC time zone at the start of query evaluation.", + usage = "_FUNC_() - Returns the current date at the start of query evaluation.", since = "1.5.0") -// scalastyle:on line.size.limit -case class
[spark] branch master updated: [SPARK-26012][SQL] Null and '' values should not cause dynamic partition failure of string types
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ea4dee [SPARK-26012][SQL] Null and '' values should not cause dynamic partition failure of string types 5ea4dee is described below commit 5ea4deec447fb413fd90b8c1a8d983d6bee89d91 Author: 10129659 AuthorDate: Wed Apr 10 19:54:19 2019 +0800 [SPARK-26012][SQL] Null and '' values should not cause dynamic partition failure of string types Dynamic partition will fail when both '' and null values are taken as dynamic partition values simultaneously. For example, the test bellow will fail before this PR: test("Null and '' values should not cause dynamic partition failure of string types") { withTable("t1", "t2") { spark.range(3).write.saveAsTable("t1") spark.sql("select id, cast(case when id = 1 then '' else null end as string) as p" + " from t1").write.partitionBy("p").saveAsTable("t2") checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null))) } } The error is: 'org.apache.hadoop.fs.FileAlreadyExistsException: File already exists'. This PR convert the empty strings to null for partition values. This is another way for PR(https://github.com/apache/spark/pull/23010) (Please fill in changes proposed in this fix) How was this patch tested? New added test. Closes #24334 from eatoncys/FileFormatWriter. Authored-by: 10129659 Signed-off-by: Wenchen Fan --- .../execution/datasources/FileFormatWriter.scala | 36 +++--- .../datasources/FileFormatWriterSuite.scala| 19 +++- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a9de649..f1fc5d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -35,9 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -49,6 +52,22 @@ object FileFormatWriter extends Logging { customPartitionLocations: Map[TablePartitionSpec, String], outputColumns: Seq[Attribute]) + /** A function that converts the empty string to null for partition values. */ + case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { +override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v +override def nullable: Boolean = true +override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { +s"""if ($c.numBytes() == 0) { + | ${ev.isNull} = true; + | ${ev.value} = null; + |} else { + | ${ev.value} = $c; + |}""".stripMargin + }) +} + } + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -84,6 +103,15 @@ object FileFormatWriter extends Logging { val partitionSet = AttributeSet(partitionColumns) val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) +var needConvert = false +val projectList: Seq[NamedExpression] = plan.output.map { + case p if partitionSet.contains(p) && p.dataType == StringType && p.nullable => +needConvert = true +Alias(Empty2Null(p), p.name)() + case attr => attr +} +val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan + val bucketIdExpression = bucketSpec.map { spec => val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can @@ -123,7 +151,7 @@ object FileFormatWriter extends Logging { // We
[spark] branch branch-2.4 updated: [SPARK-27406][SQL] UnsafeArrayData serialization breaks when two machi…
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 3352803 [SPARK-27406][SQL] UnsafeArrayData serialization breaks when two machi… 3352803 is described below commit 33528036bded3601548b46f2e225ee5a3b4a14c0 Author: mingbo_pb AuthorDate: Wed Apr 10 17:05:11 2019 +0800 [SPARK-27406][SQL] UnsafeArrayData serialization breaks when two machi… This PR is the branch-2.4 version for https://github.com/apache/spark/pull/24317 Closes #24324 from pengbo/SPARK-27406-branch-2.4. Authored-by: mingbo_pb Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/UnsafeArrayData.java | 39 +- .../spark/sql/catalyst/util/UnsafeArraySuite.scala | 17 +- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java index 9002abd..8e4ecf3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java @@ -17,6 +17,10 @@ package org.apache.spark.sql.catalyst.expressions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -30,6 +34,8 @@ import org.apache.spark.unsafe.hash.Murmur3_x86_32; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; +import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; + /** * An Unsafe implementation of Array which is backed by raw memory instead of Java objects. * @@ -52,7 +58,7 @@ import org.apache.spark.unsafe.types.UTF8String; * Instances of `UnsafeArrayData` act as pointers to row data stored in this format. */ -public final class UnsafeArrayData extends ArrayData { +public final class UnsafeArrayData extends ArrayData implements Externalizable { public static int calculateHeaderPortionInBytes(int numFields) { return (int)calculateHeaderPortionInBytes((long)numFields); @@ -523,4 +529,35 @@ public final class UnsafeArrayData extends ArrayData { public static UnsafeArrayData fromPrimitiveArray(double[] arr) { return fromPrimitiveArray(arr, Platform.DOUBLE_ARRAY_OFFSET, arr.length, 8); } + + + public byte[] getBytes() { +if (baseObject instanceof byte[] +&& baseOffset == Platform.BYTE_ARRAY_OFFSET +&& (((byte[]) baseObject).length == sizeInBytes)) { + return (byte[]) baseObject; +} else { + byte[] bytes = new byte[sizeInBytes]; + Platform.copyMemory(baseObject, baseOffset, bytes, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); + return bytes; +} + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { +byte[] bytes = getBytes(); +out.writeInt(bytes.length); +out.writeInt(this.numElements); +out.write(bytes); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { +this.baseOffset = BYTE_ARRAY_OFFSET; +this.sizeInBytes = in.readInt(); +this.numElements = in.readInt(); +this.elementOffset = baseOffset + calculateHeaderPortionInBytes(this.numElements); +this.baseObject = new byte[sizeInBytes]; +in.readFully((byte[]) baseObject); + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala index 755c889..818b2bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.catalyst.util -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class UnsafeArraySuite extends SparkFunSuite { @@ -204,4 +206,17 @@ class UnsafeArraySuite extends SparkFunSuite { val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind() assert(doubleEncoder.toRow(doubleArray).getArray(0).toDoubleArray.sameElements(doubleArray)) } + + test("unsafe
[spark] branch master updated: [SPARK-24872] Replace taking the $symbol with $sqlOperator in BinaryOperator's toString method
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 85e5d4f [SPARK-24872] Replace taking the $symbol with $sqlOperator in BinaryOperator's toString method 85e5d4f is described below commit 85e5d4f141eedd571dfa0dcdabedced19736a351 Author: 韩田田00222924 AuthorDate: Wed Apr 10 16:58:01 2019 +0800 [SPARK-24872] Replace taking the $symbol with $sqlOperator in BinaryOperator's toString method ## What changes were proposed in this pull request? For BinaryOperator's toString method, it's better to use `$sqlOperator` instead of `$symbol`. ## How was this patch tested? We can test this patch with unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #21826 from httfighter/SPARK-24872. Authored-by: 韩田田00222924 Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/catalyst/expressions/Expression.scala| 2 +- .../apache/spark/sql/catalyst/expressions/PredicateSuite.scala| 8 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index d5d1195..2cd84b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -627,7 +627,7 @@ abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes { def sqlOperator: String = symbol - override def toString: String = s"($left $symbol $right)" + override def toString: String = s"($left $sqlOperator $right)" override def inputTypes: Seq[AbstractDataType] = Seq(inputType, inputType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index e38bdeb..9b6896f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -513,4 +514,11 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { interpreted.initialize(0) assert(interpreted.eval(new UnsafeRow())) } + + test("SPARK-24872: Replace taking the $symbol with $sqlOperator in BinaryOperator's" + +" toString method") { +val expression = CatalystSqlParser.parseExpression("id=1 or id=2").toString() +val expected = "(('id = 1) OR ('id = 2))" +assert(expression == expected) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27414][SQL] make it clear that date type is timezone independent
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2e90574 [SPARK-27414][SQL] make it clear that date type is timezone independent 2e90574 is described below commit 2e90574dd0e60ea960a33580dfb29654671b66f4 Author: Wenchen Fan AuthorDate: Wed Apr 10 16:39:28 2019 +0800 [SPARK-27414][SQL] make it clear that date type is timezone independent ## What changes were proposed in this pull request? In SQL standard, date type is a union of the `year`, `month` and `day` fields. It's timezone independent, which means it does not represent a specific point in the timeline. Spark SQL follows the SQL standard, this PR is to make it clear that date type is timezone independent 1. improve the doc to highlight that date is timezone independent. 2. when converting string to date, uses the java time API that can directly parse a `LocalDate` from a string, instead of converting `LocalDate` to a `Instant` at UTC first. 3. when converting date to string, uses the java time API that can directly format a `LocalDate` to a string, instead of converting `LocalDate` to a `Instant` at UTC first. 2 and 3 should not introduce any behavior changes. ## How was this patch tested? existing tests Closes #24325 from cloud-fan/doc. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- docs/sql-migration-guide-upgrade.md | 2 +- docs/sql-reference.md| 6 -- .../org/apache/spark/sql/catalyst/util/DateFormatter.scala | 12 .../org/apache/spark/sql/catalyst/util/DateTimeUtils.scala | 7 --- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index c3837f6..741c510 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -122,7 +122,7 @@ license: | - In Spark version 2.4 and earlier, the `current_date` function returns the current date shifted according to the SQL config `spark.sql.session.timeZone`. Since Spark 3.0, the function always returns the current date in the `UTC` time zone. - - Since Spark 3.0, `TIMESTAMP` literals are converted to strings using the SQL config `spark.sql.session.timeZone`, and `DATE` literals are formatted using the UTC time zone. In Spark version 2.4 and earlier, both conversions use the default time zone of the Java virtual machine. + - Since Spark 3.0, `TIMESTAMP` literals are converted to strings using the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion uses the default time zone of the Java virtual machine. - In Spark version 2.4, when a spark session is created via `cloneSession()`, the newly created spark session inherits its configuration from its parent `SparkContext` even though the same configuration may exist with a different value in its parent spark session. Since Spark 3.0, the configurations of a parent `SparkSession` have a higher precedence over the parent `SparkContext`. diff --git a/docs/sql-reference.md b/docs/sql-reference.md index ee99ed8..2ec26ec 100644 --- a/docs/sql-reference.md +++ b/docs/sql-reference.md @@ -46,8 +46,10 @@ Spark SQL and DataFrames support the following data types: - `BooleanType`: Represents boolean values. * Datetime type - `TimestampType`: Represents values comprising values of fields year, month, day, - hour, minute, and second. - - `DateType`: Represents values comprising values of fields year, month, day. + hour, minute, and second, with the session local time-zone. The timestamp value represents an + absolute point in time. + - `DateType`: Represents values comprising values of fields year, month and day, without a + time-zone. * Complex types - `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of elements with the type of `elementType`. `containsNull` is used to indicate if diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 20e043a..9843297 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.catalyst.util -import java.time.{Instant, ZoneOffset} +import java.time.LocalDate import java.util.Locale -import java.util.concurrent.TimeUnit.SECONDS sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch @@ -34,15 +33,12 @@ class Iso8601DateFormatter( private
[spark] branch master updated: [SPARK-27181][SQL] Add public transform API
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 58674d5 [SPARK-27181][SQL] Add public transform API 58674d5 is described below commit 58674d54baedc048ae9bf237f268dce5f3d06d77 Author: Ryan Blue AuthorDate: Wed Apr 10 14:30:39 2019 +0800 [SPARK-27181][SQL] Add public transform API ## What changes were proposed in this pull request? This adds a public Expression API that can be used to pass partition transformations to data sources. ## How was this patch tested? Existing tests to validate no regressions. Added transform cases to DDL suite and v1 conversions suite. Closes #24117 from rdblue/add-public-transform-api. Authored-by: Ryan Blue Signed-off-by: Wenchen Fan --- .../apache/spark/sql/catalyst/parser/SqlBase.g4| 17 +- .../sql/catalog/v2/expressions/Expression.java | 31 .../sql/catalog/v2/expressions/Expressions.java| 162 .../spark/sql/catalog/v2/expressions/Literal.java | 42 + .../sql/catalog/v2/expressions/NamedReference.java | 33 .../sql/catalog/v2/expressions/Transform.java | 44 + .../sql/catalog/v2/expressions/expressions.scala | 203 + .../spark/sql/catalyst/parser/AstBuilder.scala | 101 +- .../plans/logical/sql/CreateTableStatement.scala | 5 +- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 52 +- .../datasources/DataSourceResolution.scala | 7 +- .../execution/command/PlanResolutionSuite.scala| 20 ++ 12 files changed, 705 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 0f9387b..d261b56 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -92,7 +92,7 @@ statement | SHOW DATABASES (LIKE? pattern=STRING)? #showDatabases | createTableHeader ('(' colTypeList ')')? tableProvider ((OPTIONS options=tablePropertyList) | -(PARTITIONED BY partitionColumnNames=identifierList) | +(PARTITIONED BY partitioning=transformList) | bucketSpec | locationSpec | (COMMENT comment=STRING) | @@ -587,6 +587,21 @@ namedExpressionSeq : namedExpression (',' namedExpression)* ; +transformList +: '(' transforms+=transform (',' transforms+=transform)* ')' +; + +transform +: qualifiedName #identityTransform +| transformName=identifier + '(' argument+=transformArgument (',' argument+=transformArgument)* ')' #applyTransform +; + +transformArgument +: qualifiedName +| constant +; + expression : booleanExpression ; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java new file mode 100644 index 000..1e2aca9 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expression.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2.expressions; + +import org.apache.spark.annotation.Experimental; + +/** + * Base class of the public logical expression API. + */ +@Experimental +public interface Expression { + /** + * Format the expression as a human readable SQL-like string. + */ + String describe(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java new file mode 100644 index 000..009e89b --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java @@ -0,0 +1,162 @@ +/* +