[spark] branch master updated: [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 13b67ee8cc3 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader 13b67ee8cc3 is described below commit 13b67ee8cc377a5cc47d02b9addbc00eabfc8b6c Author: Zamil Majdy AuthorDate: Sun Oct 22 10:53:22 2023 +0500 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader ### What changes were proposed in this pull request? Currently, the read logical type is not checked while converting physical types INT64 into DateTime. One valid scenario where this can break is where the physical type is `timestamp_ntz`, and the logical type is `array`, since the logical type check does not happen, this conversion is allowed. However, the vectorized reader does not support this and will produce NPE on on-heap memory mode and SEGFAULT on off-heap memory mode. Segmentation fault on off-heap memory mode c [...] ### Why are the changes needed? Prevent NPE or Segfault from happening. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new test is added in `ParquetSchemaSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43451 from majdyz/SPARK-45604. Lead-authored-by: Zamil Majdy Co-authored-by: Zamil Majdy Signed-off-by: Max Gekk --- .../parquet/ParquetVectorUpdaterFactory.java| 10 -- .../datasources/parquet/ParquetSchemaSuite.scala| 21 + 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index d5675db4c3a..26bef0fe3a6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,7 +109,8 @@ public class ParquetVectorUpdaterFactory { // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); -} else if (isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { +} else if (isTimestamp(sparkType) && +isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); @@ -117,7 +118,8 @@ public class ParquetVectorUpdaterFactory { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } -} else if (isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { +} else if (isTimestamp(sparkType) && +isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); @@ -1149,6 +1151,10 @@ public class ParquetVectorUpdaterFactory { return false; } + private static boolean isTimestamp(DataType dt) { +return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType; + } + private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) { DecimalType d = (DecimalType) dt; LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index ef06e64d2eb..19feb9b8bb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1087,6 +1087,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { +import testImplicits._ + +withTempPath { dir => + val path = dir.getCanonicalPath + val timestamp = java.time.LocalDateTime.of(1, 2, 3, 4, 5) + val df1 = Seq((1, timestamp)).toDF() + val df2 = Seq((2, Array(timestamp))).toDF() +
[spark] branch branch-3.4 updated: [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new b24bc5c1a27 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader b24bc5c1a27 is described below commit b24bc5c1a27e847143a8159d4291ddac87b7f387 Author: Zamil Majdy AuthorDate: Sun Oct 22 10:53:22 2023 +0500 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader ### What changes were proposed in this pull request? Currently, the read logical type is not checked while converting physical types INT64 into DateTime. One valid scenario where this can break is where the physical type is `timestamp_ntz`, and the logical type is `array`, since the logical type check does not happen, this conversion is allowed. However, the vectorized reader does not support this and will produce NPE on on-heap memory mode and SEGFAULT on off-heap memory mode. Segmentation fault on off-heap memory mode c [...] ### Why are the changes needed? Prevent NPE or Segfault from happening. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new test is added in `ParquetSchemaSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43451 from majdyz/SPARK-45604. Lead-authored-by: Zamil Majdy Co-authored-by: Zamil Majdy Signed-off-by: Max Gekk (cherry picked from commit 13b67ee8cc377a5cc47d02b9addbc00eabfc8b6c) Signed-off-by: Max Gekk --- .../parquet/ParquetVectorUpdaterFactory.java| 10 -- .../datasources/parquet/ParquetSchemaSuite.scala| 21 + 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 15d58f0c757..42442cf8ea8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,7 +109,8 @@ public class ParquetVectorUpdaterFactory { // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); -} else if (isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { +} else if (isTimestamp(sparkType) && +isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); @@ -117,7 +118,8 @@ public class ParquetVectorUpdaterFactory { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } -} else if (isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { +} else if (isTimestamp(sparkType) && +isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); @@ -1150,6 +1152,10 @@ public class ParquetVectorUpdaterFactory { return false; } + private static boolean isTimestamp(DataType dt) { +return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType; + } + private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) { DecimalType d = (DecimalType) dt; LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 5589c61be7a..bf5c51b89bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1067,6 +1067,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { +import testImplicits._ + +withTempPath { dir => + val path = dir.getCanonicalPath + val timestamp = java.time.LocalDateTime.of(1, 2, 3, 4, 5) + val df1 = Seq((1,
[spark] branch branch-3.5 updated: [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6c55a6c0c68 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader 6c55a6c0c68 is described below commit 6c55a6c0c680f80a6cdef7f1a83045b6400b4d09 Author: Zamil Majdy AuthorDate: Sun Oct 22 10:53:22 2023 +0500 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion on Parquet Vectorized Reader ### What changes were proposed in this pull request? Currently, the read logical type is not checked while converting physical types INT64 into DateTime. One valid scenario where this can break is where the physical type is `timestamp_ntz`, and the logical type is `array`, since the logical type check does not happen, this conversion is allowed. However, the vectorized reader does not support this and will produce NPE on on-heap memory mode and SEGFAULT on off-heap memory mode. Segmentation fault on off-heap memory mode c [...] ### Why are the changes needed? Prevent NPE or Segfault from happening. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new test is added in `ParquetSchemaSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43451 from majdyz/SPARK-45604. Lead-authored-by: Zamil Majdy Co-authored-by: Zamil Majdy Signed-off-by: Max Gekk (cherry picked from commit 13b67ee8cc377a5cc47d02b9addbc00eabfc8b6c) Signed-off-by: Max Gekk --- .../parquet/ParquetVectorUpdaterFactory.java| 10 -- .../datasources/parquet/ParquetSchemaSuite.scala| 21 + 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 15d58f0c757..42442cf8ea8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,7 +109,8 @@ public class ParquetVectorUpdaterFactory { // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); -} else if (isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { +} else if (isTimestamp(sparkType) && +isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); @@ -117,7 +118,8 @@ public class ParquetVectorUpdaterFactory { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } -} else if (isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { +} else if (isTimestamp(sparkType) && +isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); @@ -1150,6 +1152,10 @@ public class ParquetVectorUpdaterFactory { return false; } + private static boolean isTimestamp(DataType dt) { +return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType; + } + private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) { DecimalType d = (DecimalType) dt; LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index facc9b90ff7..3f47c5e506f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1087,6 +1087,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { +import testImplicits._ + +withTempPath { dir => + val path = dir.getCanonicalPath + val timestamp = java.time.LocalDateTime.of(1, 2, 3, 4, 5) + val df1 = Seq((1,
[spark] branch master updated: [SPARK-45484][SQL][FOLLOWUP][DOCS] Update the document of parquet compression codec
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4023ec9bb44 [SPARK-45484][SQL][FOLLOWUP][DOCS] Update the document of parquet compression codec 4023ec9bb44 is described below commit 4023ec9bb4471efee36afcec041c114a4b86a2c8 Author: Jiaan Geng AuthorDate: Sat Oct 21 16:39:13 2023 -0500 [SPARK-45484][SQL][FOLLOWUP][DOCS] Update the document of parquet compression codec ### What changes were proposed in this pull request? This PR follows up https://github.com/apache/spark/pull/43310 to update the document of parquet compression codec. ### Why are the changes needed? Update the document of parquet compression codec. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43464 from beliefer/SPARK-45484_followup. Authored-by: Jiaan Geng Signed-off-by: Sean Owen --- docs/sql-data-sources-parquet.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 925e47504e5..c2af58248ea 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -423,7 +423,7 @@ Data source options of Parquet can be set via: compression snappy -Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, uncompressed, snappy, gzip, lzo, brotli, lz4, and zstd). This will override spark.sql.parquet.compression.codec. +Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, uncompressed, snappy, gzip, lzo, brotli, lz4, lz4_raw, and zstd). This will override spark.sql.parquet.compression.codec. write @@ -484,7 +484,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession Sets the compression codec used when writing Parquet files. If either compression or parquet.compression is specified in the table-specific options/properties, the precedence would be compression, parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: -none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd. +none, uncompressed, snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd. Note that brotli requires BrotliCodec to be installed. 1.1.1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR] Fix typos
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 920fb673b26 [MINOR] Fix typos 920fb673b26 is described below commit 920fb673b264c0bdcad0426020dedf57d8b11cc7 Author: shuoer86 <129674997+shuoe...@users.noreply.github.com> AuthorDate: Sat Oct 21 16:37:27 2023 -0500 [MINOR] Fix typos Closes #43434 from shuoer86/master. Authored-by: shuoer86 <129674997+shuoe...@users.noreply.github.com> Signed-off-by: Sean Owen --- binder/postBuild| 4 ++-- .../scala/org/apache/spark/sql/connect/service/SessionHolder.scala | 2 +- .../spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala | 2 +- core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala | 2 +- core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala | 6 +++--- .../main/scala/org/apache/spark/ui/jobs/TaskThreadDumpPage.scala| 2 +- .../scala/org/apache/spark/status/AutoCleanupLiveUIDirSuite.scala | 2 +- docs/sql-ref-syntax-ddl-declare-variable.md | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/binder/postBuild b/binder/postBuild index 70ae23b3937..b6bdf72324c 100644 --- a/binder/postBuild +++ b/binder/postBuild @@ -38,7 +38,7 @@ else pip install plotly "pandas<2.0.0" "pyspark[sql,ml,mllib,pandas_on_spark]$SPECIFIER$VERSION" fi -# Set 'PYARROW_IGNORE_TIMEZONE' to surpress warnings from PyArrow. +# Set 'PYARROW_IGNORE_TIMEZONE' to suppress warnings from PyArrow. echo "export PYARROW_IGNORE_TIMEZONE=1" >> ~/.profile # Add sbin to PATH to run `start-connect-server.sh`. @@ -50,7 +50,7 @@ echo "export SPARK_HOME=${SPARK_HOME}" >> ~/.profile SPARK_VERSION=$(python -c "import pyspark; print(pyspark.__version__)") echo "export SPARK_VERSION=${SPARK_VERSION}" >> ~/.profile -# Surpress warnings from Spark jobs, and UI progress bar. +# Suppress warnings from Spark jobs, and UI progress bar. mkdir -p ~/.ipython/profile_default/startup echo """from pyspark.sql import SparkSession SparkSession.builder.config('spark.ui.showConsoleProgress', 'false').getOrCreate().sparkContext.setLogLevel('FATAL') diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 27f471233f1..dcced21f371 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -77,7 +77,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[service] def addExecuteHolder(executeHolder: ExecuteHolder): Unit = { val oldExecute = executions.putIfAbsent(executeHolder.operationId, executeHolder) if (oldExecute != null) { - // the existance of this should alrady be checked by SparkConnectExecutionManager + // the existence of this should alrady be checked by SparkConnectExecutionManager throw new IllegalStateException( s"ExecuteHolder with opId=${executeHolder.operationId} already exists!") } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala index ea9ae3ed9d9..e1de6b04d21 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala @@ -226,7 +226,7 @@ class SparkConnectPluginRegistrySuite extends SharedSparkSession with SparkConne } } - test("Emtpy registries are really empty and work") { + test("Empty registries are really empty and work") { assert(SparkConnectPluginRegistry.loadRelationPlugins().isEmpty) assert(SparkConnectPluginRegistry.loadExpressionPlugins().isEmpty) assert(SparkConnectPluginRegistry.loadCommandPlugins().isEmpty) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index f80190c96e8..73e72b7f1df 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -259,7 +259,7 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false } /** - * Apply function `f` on the [[BlockInfo]] object and the aquisition [[Condition]] for `blockId`. + * Apply function `f` on the [[BlockInfo]] object and
Re: [PR] update the canonical link, due to a change in some addresses in the latest version of the document [spark-website]
panbingkun commented on PR #483: URL: https://github.com/apache/spark-website/pull/483#issuecomment-1773725400 In order to prevent possible merge conflicts, after this PR, I will add `Matomo analytics` to the already published HTML document. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org