[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30835][SPARK-39018][CORE][YARN] Add support for YARN decommissioning when ESS is disabled
abhishekd0907 commented on PR #35683: URL: https://github.com/apache/spark/pull/35683#issuecomment-1133541810 ping @mridulm @attilapiros -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #36617: [SPARK-38687][SQL] Use error classes in the compilation errors of generators
panbingkun commented on PR #36617: URL: https://github.com/apache/spark/pull/36617#issuecomment-1133539935 ping @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #36618: [SPARK-39237][DOCS][3.2] Update the ANSI SQL mode documentation
gengliangwang closed pull request #36618: [SPARK-39237][DOCS][3.2] Update the ANSI SQL mode documentation URL: https://github.com/apache/spark/pull/36618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36618: [SPARK-39237][DOCS][3.2] Update the ANSI SQL mode documentation
gengliangwang commented on PR #36618: URL: https://github.com/apache/spark/pull/36618#issuecomment-1133538689 @dongjoon-hyun Thanks for the review Merging to 3.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36580: [SPARK-39167][SQL] Throw an exception w/ an error class for multiple rows from a subquery used as an expression
MaxGekk closed pull request #36580: [SPARK-39167][SQL] Throw an exception w/ an error class for multiple rows from a subquery used as an expression URL: https://github.com/apache/spark/pull/36580 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] attilapiros commented on a diff in pull request #36512: [SPARK-39152][CORE] Deregistering disk persisted local RDD blocks in case of IO related errors
attilapiros commented on code in PR #36512: URL: https://github.com/apache/spark/pull/36512#discussion_r878644162 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -933,46 +935,56 @@ private[spark] class BlockManager( }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { + var diskData: BlockData = null try { -val diskData = diskStore.getBytes(blockId) -val iterToReturn: Iterator[Any] = { - if (level.deserialized) { -val diskValues = serializerManager.dataDeserializeStream( - blockId, - diskData.toInputStream())(info.classTag) -maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) - } else { -val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) - .map { _.toInputStream(dispose = false) } - .getOrElse { diskData.toInputStream() } -serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) - } +diskData = diskStore.getBytes(blockId) +val iterToReturn = if (level.deserialized) { + val diskValues = serializerManager.dataDeserializeStream( +blockId, +diskData.toInputStream())(info.classTag) + maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) +} else { + val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) +.map { _.toInputStream(dispose = false) } +.getOrElse { diskData.toInputStream() } + serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { releaseLockAndDispose(blockId, diskData, taskContext) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } catch { -case ex: KryoException if ex.getCause.isInstanceOf[IOException] => - // We need to have detailed log message to catch environmental problems easily. - // Further details: https://issues.apache.org/jira/browse/SPARK-37710 - processKryoException(ex, blockId) - throw ex +case t: Throwable => + if (diskData != null) { +diskData.dispose() +diskData = null + } + releaseLock(blockId, taskContext) + if (isIORelatedException(t)) { +logInfo(extendMessageWithBlockDetails(t.getMessage, blockId)) +// Remove the block so that its unavailability is reported to the driver +removeBlock(blockId) Review Comment: This should only affect RDD blocks (`removeBlock` is also called via `handleLocalReadFailure` from line 971 when the block info manager knows the block but the and the stores memory/disk is does not contain it). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] weixiuli commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
weixiuli commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r878639022 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: Setting the accumUpdates to the TaskInfo may be unnecessary, there are two reasons: 1. We only need the records and runTime in accumUpdates 2. Setting the accumUpdates to the TaskInfo may take up more storage space, and calculating the rate also should traverse it. With your suggestions , we may only set the records and runTime to the TaskInfo and calculate the rate in InefficientTaskCalculator when it's required. But, if we do that , we should make sure that setting and reading the records and runTime with lock, which may make the logic more complicated than that we only set a taskProgressRate to the TaskInfo, right ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStr
alex-balikov commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878638504 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala: ## @@ -96,8 +96,13 @@ class RateStreamProviderSuite extends StreamTest { // We have to use the lambda version of CheckAnswer because we don't know the right range // until we see the last offset. +// SPARK-39242 - its possible that the next output to sink has happened +// since the last query progress and the output rows reflect that. +// We just need to compare for the saved stream duration here and hence +// we only use those number of sorted elements from output rows. def expectedResultsFromDuration(rows: Seq[Row]): Unit = { - assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10))) + assert(rows.map(_.getLong(0)).sorted.take(streamDuration * 10) +== (0 until (streamDuration * 10))) } Review Comment: sorry, I realized that the right way to do this is not to do newOffset.toString.toLong but rather newOffset.asInstanceOf[LongOffset].offset then of course the test is not needed any more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] weixiuli commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
weixiuli commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r878111861 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: If we calculate the progress task's taskProgressRate in InefficientTaskCalculator, we can only use the TaskInfo. _accumulables instead of accumUpdates, while the TaskInfo._accumulables is updated by event which may be lost and the TaskInfo._accumulables may unreliable. With the @mridulm suggestions on https://github.com/apache/spark/pull/36162#discussion_r865591651 For in progress tasks: we can update it via executor heartbeat , it is not only latest but also reliable. Getting the taskProgressRate is based on existing traversal logic, additional calculations can be ignored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor opened a new pull request, #36623: [WIP][SPARK-39245][SQL] Support Avro scans with DEFAULT values
dtenedor opened a new pull request, #36623: URL: https://github.com/apache/spark/pull/36623 ### What changes were proposed in this pull request? Support Avro scans when the table schema has associated DEFAULT column values. Example: ``` create table t(i int) using avro; insert into t values(42); alter table t add column s string default concat('abc', def'); select * from t; > 42, 'abcdef' ``` ### Why are the changes needed? This change makes it easier to build, query, and maintain tables backed by Avro data. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #32769: [SPARK-35630][SQL] ExpandExec should not introduce unnecessary exchanges
github-actions[bot] commented on PR #32769: URL: https://github.com/apache/spark/pull/32769#issuecomment-1133479258 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #34800: [SPARK-37538][SQL] Replace single projection expand
github-actions[bot] commented on PR #34800: URL: https://github.com/apache/spark/pull/34800#issuecomment-1133479253 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #35360: [SPARK-38065][SQL] Improve the performance of DS V2 aggregate push-down
github-actions[bot] commented on PR #35360: URL: https://github.com/apache/spark/pull/35360#issuecomment-1133479251 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #35424: [WIP][SPARK-38116] Add auto commit option to JDBC PostgreSQL driver and set the option false default
github-actions[bot] closed pull request #35424: [WIP][SPARK-38116] Add auto commit option to JDBC PostgreSQL driver and set the option false default URL: https://github.com/apache/spark/pull/35424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #35965: [SPARK-38647][SQL] Add SupportsReportOrdering mix in interface for Scan (DataSourceV2)
sunchao commented on code in PR #35965: URL: https://github.com/apache/spark/pull/35965#discussion_r878596697 ## sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaOrderAndPartitionAwareDataSource.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.connector; + +import java.util.Arrays; + +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.expressions.*; +import org.apache.spark.sql.connector.read.*; +import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning; +import org.apache.spark.sql.connector.read.partitioning.Partitioning; +import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class JavaOrderAndPartitionAwareDataSource extends JavaPartitionAwareDataSource { + + static class MyScanBuilder extends JavaPartitionAwareDataSource.MyScanBuilder + implements SupportsReportOrdering { + +private final Partitioning partitioning; +private final SortOrder[] ordering; + +MyScanBuilder(String partitionKeys, String orderKeys) { + if (partitionKeys != null) { +String[] keys = partitionKeys.split(","); +Expression[] clustering = new Transform[keys.length]; +for (int i = 0; i < keys.length; i++) { + clustering[i] = Expressions.identity(keys[i]); +} +this.partitioning = new KeyGroupedPartitioning(clustering, 2); + } else { +this.partitioning = new UnknownPartitioning(2); + } + + if (orderKeys != null) { +String[] keys = orderKeys.split(","); +this.ordering = new SortOrder[keys.length]; +for (int i = 0; i < keys.length; i++) { + this.ordering[i] = new MySortOrder(keys[i]); +} + } else { +this.ordering = new SortOrder[0]; + } +} + +@Override +public InputPartition[] planInputPartitions() { + InputPartition[] partitions = new InputPartition[2]; + partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 5, 5}); + partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 1, 2}); + return partitions; +} + +@Override +public Partitioning outputPartitioning() { + return this.partitioning; +} + +@Override +public SortOrder[] outputOrdering() { + return this.ordering; +} + } + + @Override + public Table getTable(CaseInsensitiveStringMap options) { +return new JavaSimpleBatchTable() { + @Override + public Transform[] partitioning() { +String partitionKeys = options.get("partitionKeys"); +if (partitionKeys == null) { + return new Transform[0]; +} else { + return (Transform[])Arrays.stream(partitionKeys.split(",")) Review Comment: nit: space before `Arrays` ## sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala: ## @@ -886,6 +972,66 @@ class PartitionAwareDataSource extends TestingV2Source { } } +class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource { + + class MyScanBuilder(val partitionKeys: Option[Seq[String]], + val orderKeys: Seq[String]) extends SimpleScanBuilder Review Comment: nit: indent ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala: ## @@ -138,6 +138,15 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } } + override def outputOrdering: Seq[SortOrder] = scan match { +case s: SupportsReportOrdering if this.logicalLink.isDefined && Review Comment: I'm not totally sure this is the correct approach. Could we follow the existing pattern and add an optimizer rule to populate this? we can re-use existing `V2ScanPartitioning` too (and change its name). ## sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaOrderAndPartitionAwareDataSource.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the
[GitHub] [spark] williamhyun opened a new pull request, #36622: [SPARK-39244][INFRA] Use --no-echo instead of --slave in R 4.0+
williamhyun opened a new pull request, #36622: URL: https://github.com/apache/spark/pull/36622 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #36602: [MINOR][SQL] JDBCTableCatalog: "initialized" typo
srowen commented on PR #36602: URL: https://github.com/apache/spark/pull/36602#issuecomment-1133455400 @GavinRay97 can you enable the test workflows to run in your forked repo? Looks like it isn't running. It's a trivial change I know but best to get a green light from the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on a diff in pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStr
anishshri-db commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878579849 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala: ## @@ -96,8 +96,13 @@ class RateStreamProviderSuite extends StreamTest { // We have to use the lambda version of CheckAnswer because we don't know the right range // until we see the last offset. +// SPARK-39242 - its possible that the next output to sink has happened +// since the last query progress and the output rows reflect that. +// We just need to compare for the saved stream duration here and hence +// we only use those number of sorted elements from output rows. def expectedResultsFromDuration(rows: Seq[Row]): Unit = { - assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10))) + assert(rows.map(_.getLong(0)).sorted.take(streamDuration * 10) +== (0 until (streamDuration * 10))) } Review Comment: Done - Added basic test for longOffset conversion validation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStr
alex-balikov commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878574332 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala: ## @@ -96,8 +96,13 @@ class RateStreamProviderSuite extends StreamTest { // We have to use the lambda version of CheckAnswer because we don't know the right range // until we see the last offset. +// SPARK-39242 - its possible that the next output to sink has happened +// since the last query progress and the output rows reflect that. +// We just need to compare for the saved stream duration here and hence +// we only use those number of sorted elements from output rows. def expectedResultsFromDuration(rows: Seq[Row]): Unit = { - assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10))) + assert(rows.map(_.getLong(0)).sorted.take(streamDuration * 10) +== (0 until (streamDuration * 10))) } Review Comment: Can you adda small test that LongOffset.toString.toLong works as your code change depends on that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on a diff in pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStr
anishshri-db commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878569698 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ## @@ -445,7 +445,17 @@ abstract class StreamExecution( false } else { val source = sources(sourceIndex) -!localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset +// SPARK-39242 For numeric increasing offsets, we could have called awaitOffset +// after the stream has moved past the expected newOffset or if committedOffsets +// changed after notify. In this case, its safe to exit, since at-least the given +// Offset has been reached and the equality condition might never be met. +if (!localCommittedOffsets.contains(source)) { + true +} else if (newOffset.isInstanceOf[LongOffset]) { + localCommittedOffsets(source).toString.toLong < newOffset.toString.toLong Review Comment: Right yea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamPro
viirya commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878569462 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ## @@ -445,7 +445,17 @@ abstract class StreamExecution( false } else { val source = sources(sourceIndex) -!localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset +// SPARK-39242 For numeric increasing offsets, we could have called awaitOffset +// after the stream has moved past the expected newOffset or if committedOffsets +// changed after notify. In this case, its safe to exit, since at-least the given +// Offset has been reached and the equality condition might never be met. +if (!localCommittedOffsets.contains(source)) { + true +} else if (newOffset.isInstanceOf[LongOffset]) { + localCommittedOffsets(source).toString.toLong < newOffset.toString.toLong Review Comment: Oh, it is `notDone`, nvm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamPro
viirya commented on code in PR #36620: URL: https://github.com/apache/spark/pull/36620#discussion_r878569111 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ## @@ -445,7 +445,17 @@ abstract class StreamExecution( false } else { val source = sources(sourceIndex) -!localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset +// SPARK-39242 For numeric increasing offsets, we could have called awaitOffset +// after the stream has moved past the expected newOffset or if committedOffsets +// changed after notify. In this case, its safe to exit, since at-least the given +// Offset has been reached and the equality condition might never be met. +if (!localCommittedOffsets.contains(source)) { + true +} else if (newOffset.isInstanceOf[LongOffset]) { + localCommittedOffsets(source).toString.toLong < newOffset.toString.toLong Review Comment: ```suggestion localCommittedOffsets(source).toString.toLong <= newOffset.toString.toLong ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r878542464 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") +new Table( + name = t.identifier.name(), + database = t.identifier.namespace().head, + description = t.table.properties().get("comment"), + tableType = +if (isExternal) CatalogTableType.EXTERNAL.name +else CatalogTableType.MANAGED.name, + isTemporary = false) + case v: ResolvedView => +new Table( Review Comment: Why `database` is null if `qualifier.length` != 2? `qualifier` could contain only one string which is a database name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvide
anishshri-db commented on PR #36620: URL: https://github.com/apache/spark/pull/36620#issuecomment-1133342453 @alex-balikov - This is the PR on the OSS Spark side. Could you please review this one ? Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk opened a new pull request, #36621: [WIP][SQL][DOCS] Describe the rules of quoting elements in error messages
MaxGekk opened a new pull request, #36621: URL: https://github.com/apache/spark/pull/36621 ### What changes were proposed in this pull request? In the PR, I propose to describe the rules of quoting elements in error messages introduced by the PRs: - https://github.com/apache/spark/pull/36210 - https://github.com/apache/spark/pull/36233 - https://github.com/apache/spark/pull/36259 - https://github.com/apache/spark/pull/36324 - https://github.com/apache/spark/pull/36335 - https://github.com/apache/spark/pull/36359 - https://github.com/apache/spark/pull/36579 ### Why are the changes needed? To improve code maintenance, and the process of code review. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By existing GAs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvide
anishshri-db commented on PR #36620: URL: https://github.com/apache/spark/pull/36620#issuecomment-1133278252 @viirya - Yea not sure. Merged from master and seems to be running now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvider test
viirya commented on PR #36620: URL: https://github.com/apache/spark/pull/36620#issuecomment-1133273529 The GA CI seems not enabled to run, do you enable it in your repo? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
MaxGekk closed pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function URL: https://github.com/apache/spark/pull/36584 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
MaxGekk commented on PR #36584: URL: https://github.com/apache/spark/pull/36584#issuecomment-1133247766 +1, LGTM. Merging to master. Thank you, @vli-databricks, and @dtenedor for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on pull request #36475: [SPARK-38869][SQL] Respect table capability ACCEPT_ANY_SCHEMA in DEFAULT column resolution
dtenedor commented on PR #36475: URL: https://github.com/apache/spark/pull/36475#issuecomment-1133225214 > @dtenedor after a closer look, I think we can resolve this in a simpler way. I make a PR on your repo: https://github.com/dtenedor/spark/pull/4. You can merge it on your repo if you think it is OK. Thanks, that works well! I merged it in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r878455439 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") +new Table( + name = t.identifier.name(), + database = t.identifier.namespace().head, + description = t.table.properties().get("comment"), + tableType = +if (isExternal) CatalogTableType.EXTERNAL.name +else CatalogTableType.MANAGED.name, + isTemporary = false) + case v: ResolvedView => +new Table( + name = v.identifier.name(), + database = v.identifier.namespace().toString, + description = "", + tableType = "", Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on pull request #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvide
anishshri-db commented on PR #36620: URL: https://github.com/apache/spark/pull/36620#issuecomment-1133196464 @HeartSaVioR @viirya - Please take a look. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db opened a new pull request, #36620: [SPARK-39242][SS] Fix awaitOffset to wait for committedOffset to reach at-least expected offset for longOffset and fix RateStreamProvid
anishshri-db opened a new pull request, #36620: URL: https://github.com/apache/spark/pull/36620 ### What changes were proposed in this pull request? Fix awaitOffset to wait for committedOffset to reach atleast expected offset instead of exact value for long offsets. Also fixed the RateStreamProvider test to use only row values for requested range. Basically, for numeric increasing offsets, we could have called awaitOffset after the stream has moved past the expected newOffset or if committedOffsets changed after notify. In this case, its safe to exit, since at-least the given Offset has been reached and the equality condition might never be met. ### Why are the changes needed? Fixing bug with awaitOffset logic and RateStreamProvider test ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? With the current code, we were seeing race conditions where the test would hang and get terminated after a 60 min timeout. With the change, ran the specific test 100 times multiple times and verified that we don't see the test failure any more. Was also able to simulate the failure by introducing arbitrary sleep in the code paths and ensured that the test passes with the above fix. ``` [info] RateStreamProviderSuite: [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for avro / Test / testOnly 11:08:40.357 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for hive / Test / testOnly [info] - RateStreamProvider in registry (531 milliseconds) [info] - compatible with old path in registry (3 milliseconds) [warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list 11:08:43.608 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for mllib / Test / testOnly [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for sql-kafka-0-10 / Test / testOnly [info] - microbatch - basic (3 seconds, 966 milliseconds) 11:08:45.807 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for repl / Test / testOnly 11:08:48.493 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for examples / Test / testOnly [info] - microbatch - restart (4 seconds, 365 milliseconds) 11:08:50.278 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] Passed: Total 0, Failed 0, Errors 0, Passed 0 [info] No tests to run for assembly / Test / testOnly [info] - microbatch - uniform distribution of event timestamps (696 milliseconds) [info] - microbatch - infer offsets (98 milliseconds) [info] - microbatch - predetermined batch size (86 milliseconds) [info] - microbatch - data read (85 milliseconds) [info] - valueAtSecond (0 milliseconds) [info] - overflow (265 milliseconds) [info] - illegal option values (4 milliseconds) [info] - user-specified schema given (9 milliseconds) [info] - continuous data (1 second, 6 milliseconds) 11:08:55.295 WARN org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite: = POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.sources.RateStreamProviderSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true), state-store-maintenance-task (daemon=true) = [info] Run completed in 17 seconds, 887 milliseconds. [info] Total number of tests run: 15 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 15, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. 18:59:59) INFO: Current date is 2022-05-19 (18:59:59) INFO: Analyzed target //sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2 (0 packages loaded, 0 targets configured). (18:59:59) INFO: Found 1 test target... Target //sql/core:org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite-hive-2.3__hadoop-3.2 up-to-date:
[GitHub] [spark] dtenedor commented on a diff in pull request #36583: [SPARK-39211][SQL] Support JSON scans with DEFAULT values
dtenedor commented on code in PR #36583: URL: https://github.com/apache/spark/pull/36583#discussion_r878419700 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala: ## @@ -419,7 +419,13 @@ class JacksonParser( val row = new GenericInternalRow(schema.length) var badRecordException: Option[Throwable] = None var skipRow = false - +// Apply default values from the column metadata to the initial row, if any. +if (schema.hasExistenceDefaultValues) { + for ((value: Any, i: Int) <- schema.existenceDefaultValues.zipWithIndex) { Review Comment: Thanks for pointing this out, I implemented a suggested from Gengliang to keep a boolean array to track which columns we need to assign default values to. This way it's fast, and we only update columns with explicit default values that never got values assigned during the scan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
akpatnam25 commented on PR #36601: URL: https://github.com/apache/spark/pull/36601#issuecomment-1133173384 @otterc @mridulm updated to address the latest comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se
akpatnam25 commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r878407651 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// there is a FetchFailed event and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null) { + mapOutputTracker. +unregisterMergeResult(shuffleId, reduceId, bmAddress, None) +} Review Comment: yep, added a condition to check for the special shuffle-push-merger executor, where we only remove the outputs and not the executor itself. added a corresponding UT too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se
akpatnam25 commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r878406574 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { Review Comment: added the assertion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r878354239 ## sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala: ## @@ -78,7 +78,7 @@ class Table( s"tableType='$tableType', " + s"isTemporary='$isTemporary']" } - + Review Comment: I will revert this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vli-databricks commented on a diff in pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
vli-databricks commented on code in PR #36584: URL: https://github.com/apache/spark/pull/36584#discussion_r878352841 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnyValue.scala: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types._ + +/** + * Returns the first value of `child` for a group of rows. If the first value of `child` + * is `null`, it returns `null` (respecting nulls). Even if [[AnyValue]] is used on an already + * sorted column, if we do partial aggregation and final aggregation (when mergeExpression + * is used) its result will not be deterministic (unless the input table is sorted and has + * a single partition, and we use a single reducer to do the aggregation.). + * Interchangeable with [[First]]. + */ +@ExpressionDescription( + usage = """ +_FUNC_(expr[, isIgnoreNull]) - Returns some value of `expr` for a group of rows. + If `isIgnoreNull` is true, returns only non-null values.""", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES (10), (5), (20) AS tab(col); + 10 + > SELECT _FUNC_(col) FROM VALUES (NULL), (5), (20) AS tab(col); + NULL + > SELECT _FUNC_(col, true) FROM VALUES (NULL), (5), (20) AS tab(col); + 5 + """, + note = """ +The function is non-deterministic. + """, + group = "agg_funcs", + since = "3.4.0") +case class AnyValue(child: Expression, ignoreNulls: Boolean) Review Comment: This is primarily for documentation purposes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vli-databricks commented on a diff in pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
vli-databricks commented on code in PR #36584: URL: https://github.com/apache/spark/pull/36584#discussion_r878349755 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnyValue.scala: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
LuciferYang commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r878325387 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -308,28 +308,7 @@ private[spark] object Utils extends Logging { * newly created, and is not marked for automatic deletion. */ def createDirectory(root: String, namePrefix: String = "spark"): File = { -var attempts = 0 -val maxAttempts = MAX_DIR_CREATION_ATTEMPTS -var dir: File = null -while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { -throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { -dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) -// SPARK-35907: -// This could throw more meaningful exception information if directory creation failed. -Files.createDirectories(dir.toPath) - } catch { -case e @ (_ : IOException | _ : SecurityException) => - logError(s"Failed to create directory $dir", e) - dir = null - } -} - -dir.getCanonicalFile +JavaUtils.createDirectory(root, namePrefix) Review Comment: In order to solve ``` Avoid the use of Guava's Files.createTempDir() in Spark code due to [CVE-2020-8908](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-8908) ``` I added two compatible methods in `JavaUtils` ([SPARK-39102](https://issues.apache.org/jira/browse/SPARK-39102)). > Hm, why did we not just implement the logic here? rather than use JavaUtils? I forget if there was a reason Code in `network-common` and `network-shuffle` module cannot call `Utils.createDirectory ` or `Utils. createTempDir` in `core` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
srowen commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r878325944 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -308,28 +308,7 @@ private[spark] object Utils extends Logging { * newly created, and is not marked for automatic deletion. */ def createDirectory(root: String, namePrefix: String = "spark"): File = { -var attempts = 0 -val maxAttempts = MAX_DIR_CREATION_ATTEMPTS -var dir: File = null -while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { -throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { -dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) -// SPARK-35907: -// This could throw more meaningful exception information if directory creation failed. -Files.createDirectories(dir.toPath) - } catch { -case e @ (_ : IOException | _ : SecurityException) => - logError(s"Failed to create directory $dir", e) - dir = null - } -} - -dir.getCanonicalFile +JavaUtils.createDirectory(root, namePrefix) Review Comment: Ah right, that's fine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
LuciferYang commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r878325387 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -308,28 +308,7 @@ private[spark] object Utils extends Logging { * newly created, and is not marked for automatic deletion. */ def createDirectory(root: String, namePrefix: String = "spark"): File = { -var attempts = 0 -val maxAttempts = MAX_DIR_CREATION_ATTEMPTS -var dir: File = null -while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { -throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { -dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) -// SPARK-35907: -// This could throw more meaningful exception information if directory creation failed. -Files.createDirectories(dir.toPath) - } catch { -case e @ (_ : IOException | _ : SecurityException) => - logError(s"Failed to create directory $dir", e) - dir = null - } -} - -dir.getCanonicalFile +JavaUtils.createDirectory(root, namePrefix) Review Comment: In order to solve ``` Avoid the use of Guava's Files.createTempDir() in Spark code due to [CVE-2020-8908](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-8908) ``` I added two compatible methods in `JavaUtils`. > Hm, why did we not just implement the logic here? rather than use JavaUtils? I forget if there was a reason Code in `network-common` and `network-shuffle` module cannot call `Utils.createDirectory ` or `Utils. createTempDir` in `core` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
LuciferYang commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r878325387 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -308,28 +308,7 @@ private[spark] object Utils extends Logging { * newly created, and is not marked for automatic deletion. */ def createDirectory(root: String, namePrefix: String = "spark"): File = { -var attempts = 0 -val maxAttempts = MAX_DIR_CREATION_ATTEMPTS -var dir: File = null -while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { -throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { -dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) -// SPARK-35907: -// This could throw more meaningful exception information if directory creation failed. -Files.createDirectories(dir.toPath) - } catch { -case e @ (_ : IOException | _ : SecurityException) => - logError(s"Failed to create directory $dir", e) - dir = null - } -} - -dir.getCanonicalFile +JavaUtils.createDirectory(root, namePrefix) Review Comment: In order to solve ``` Avoid the use of Guava's Files.createTempDir() in Spark code due to [CVE-2020-8908](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-8908) ``` I added two compatible methods in `JavaUtils`. > Hm, why did we not just implement the logic here? rather than use JavaUtils? I forget if there was a reason Code in `network-common` and `network-shuffle` module cannot call `Utils. createDirectory ` or `Utils. createTempDir` in `core` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
srowen commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r878304272 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -308,28 +308,7 @@ private[spark] object Utils extends Logging { * newly created, and is not marked for automatic deletion. */ def createDirectory(root: String, namePrefix: String = "spark"): File = { -var attempts = 0 -val maxAttempts = MAX_DIR_CREATION_ATTEMPTS -var dir: File = null -while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { -throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { -dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) -// SPARK-35907: -// This could throw more meaningful exception information if directory creation failed. -Files.createDirectories(dir.toPath) - } catch { -case e @ (_ : IOException | _ : SecurityException) => - logError(s"Failed to create directory $dir", e) - dir = null - } -} - -dir.getCanonicalFile +JavaUtils.createDirectory(root, namePrefix) Review Comment: Hm, why did we not just implement the logic here? rather than use JavaUtils? I forget if there was a reason -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hai-tao-1 commented on pull request #36606: [SPARK-39232][CORE] History Server Main Page App List Filtering
hai-tao-1 commented on PR #36606: URL: https://github.com/apache/spark/pull/36606#issuecomment-1133064816 > The PR test fails with `[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.12:3.2.0! Found 9 potential problems (filtered 924)` Anyone could advise what may be wrong? Thanks. NVM, I added MimaExclude and it got fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity
srowen commented on PR #36619: URL: https://github.com/apache/spark/pull/36619#issuecomment-1133064715 Merged to master/3.3/3.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity
srowen closed pull request #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity URL: https://github.com/apache/spark/pull/36619 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36617: [SPARK-38687][SQL] Use error classes in the compilation errors of generators
AmplabJenkins commented on PR #36617: URL: https://github.com/apache/spark/pull/36617#issuecomment-1132923102 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity
yaooqinn commented on PR #36619: URL: https://github.com/apache/spark/pull/36619#issuecomment-1132920553 Mac seems OK https://www.apache.org/info/verification.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r878134911 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: OK..so could we just set `accumUpdates` to the `TaskInfo` here and calculate the rate in `InefficientTaskCalculator` when it's required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] weixiuli commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
weixiuli commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r878111861 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: If we calculate the progress task's taskProgressRate in InefficientTaskCalculator, we can only use the TaskInfo. _accumulables insead of accumUpdates, while the TaskInfo._accumulables is updated by event which may be lost and the TaskInfo._accumulables may unreliable. With the @mridulm suggestions on https://github.com/apache/spark/pull/36162#discussion_r865591651 For in progress tasks: we can update it via executor heartbeat , it is not only latest but also reliable. Getting the taskProgressRate is based on existing traversal logic, additional calculations can be ignored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity
srowen commented on PR #36619: URL: https://github.com/apache/spark/pull/36619#issuecomment-1132829049 That seems OK if we rely on shasum elsewhere. I thought at one point the issue was that Mac OS didn't have this binary or something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity
yaooqinn commented on PR #36619: URL: https://github.com/apache/spark/pull/36619#issuecomment-1132694690 cc @dongjoon-hyun @srowen @HyukjinKwon @MaxGekk thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn opened a new pull request, #36619: [SPARK-39240][INFRA][BUILD] Source and binary releases using different tool to generate hashes for integrity
yaooqinn opened a new pull request, #36619: URL: https://github.com/apache/spark/pull/36619 ### What changes were proposed in this pull request? unify the hash generator for release files. ### Why are the changes needed? Currently, we use `shasum` for source but `gpg` for binary, since https://github.com/apache/spark/pull/30123 this confuses me when validating the integrities of spark 3.3.0 RC https://dist.apache.org/repos/dist/dev/spark/v3.3.0-rc2-bin/ ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? test script manually -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #36580: [SPARK-39167][SQL] Throw an exception w/ an error class for multiple rows from a subquery used as an expression
panbingkun commented on PR #36580: URL: https://github.com/apache/spark/pull/36580#issuecomment-1132691070 > git commit --allow-empty -m "Trigger build" Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36618: [SPARK-39237][DOCS][3.2] Update the ANSI SQL mode documentation
gengliangwang commented on PR #36618: URL: https://github.com/apache/spark/pull/36618#issuecomment-1132685875 This is to port https://github.com/apache/spark/pull/36614 to branch-3.2 cc @tanvn as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #36618: [SPARK-39237][DOCS][3.2] Update the ANSI SQL mode documentation
gengliangwang opened a new pull request, #36618: URL: https://github.com/apache/spark/pull/36618 ### What changes were proposed in this pull request? 1. Remove the Experimental notation in ANSI SQL compliance doc 2. Update the description of `spark.sql.ansi.enabled` ### Why are the changes needed? 1. The ANSI SQL dialect is GAed in Spark 3.2 release: https://spark.apache.org/releases/spark-release-3-2-0.html We should not mark it as "Experimental" in the doc. 2. Mention type coercion in the doc of `spark.sql.ansi.enabled` ### Does this PR introduce _any_ user-facing change? No, just doc change ### How was this patch tested? Doc preview: https://user-images.githubusercontent.com/1097932/169444094-de9c33c2-1b01-4fc3-b583-b752c71e16d8.png;> https://user-images.githubusercontent.com/1097932/169499090-690ce919-1de8-4a64-bd12-32fe5549d890.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun opened a new pull request, #36617: [SPARK-38687][SQL] Use error classes in the compilation errors of generators
panbingkun opened a new pull request, #36617: URL: https://github.com/apache/spark/pull/36617 ## What changes were proposed in this pull request? Migrate the following errors in QueryCompilationErrors onto use error classes: - nestedGeneratorError => UNSUPPORTED_GENERATOR.NESTED_IN_EXPRESSIONS - moreThanOneGeneratorError => UNSUPPORTED_GENERATOR.MULTI_GENERATOR - generatorOutsideSelectError => UNSUPPORTED_GENERATOR.OUTSIDE_SELECT - generatorNotExpectedError => UNSUPPORTED_GENERATOR.NOT_GENERATOR ### Why are the changes needed? Porting compilation errors of generator to new error framework, improve test coverage, and document expected error messages in tests. ### Does this PR introduce any user-facing change? No ### How was this patch tested? By running new test: ``` $ build/sbt "sql/testOnly *QueryCompilationErrorsSuite*" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
gengliangwang closed pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation URL: https://github.com/apache/spark/pull/36614 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
gengliangwang commented on PR #36614: URL: https://github.com/apache/spark/pull/36614#issuecomment-1132654378 Merging to master/3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r877700327 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: Can we centralize the calculation of task progress rate to the `InefficientTaskCalculator` only? It seems not each calculation is necessary here since the speculation check only happens under certain conditions, e.g., `numSuccessfulTasks >= minFinishedForSpeculation`. And I think we can reuse the existing`TaskInfo._accumulables` directly, which could make code cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36531: [SPARK-39171][SQL] Unify the Cast expression
beliefer commented on code in PR #36531: URL: https://github.com/apache/spark/pull/36531#discussion_r877893550 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -275,6 +376,55 @@ object Cast { case _ => null } } + + // Show suggestion on how to complete the disallowed explicit casting with built-in type + // conversion functions. + private def suggestionOnConversionFunctions ( + from: DataType, + to: DataType, + functionNames: String): String = { +// scalastyle:off line.size.limit +s"""cannot cast ${from.catalogString} to ${to.catalogString}. + |To convert values from ${from.catalogString} to ${to.catalogString}, you can use $functionNames instead. + |""".stripMargin +// scalastyle:on line.size.limit + } + + def typeCheckFailureMessage( + from: DataType, + to: DataType, + fallbackConfKey: Option[String], + fallbackConfValue: Option[String]): String = +(from, to) match { + case (_: NumericType, TimestampType) => +suggestionOnConversionFunctions(from, to, + "functions TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS") + + case (TimestampType, _: NumericType) => +suggestionOnConversionFunctions(from, to, "functions UNIX_SECONDS/UNIX_MILLIS/UNIX_MICROS") + + case (_: NumericType, DateType) => +suggestionOnConversionFunctions(from, to, "function DATE_FROM_UNIX_DATE") + + case (DateType, _: NumericType) => +suggestionOnConversionFunctions(from, to, "function UNIX_DATE") + + // scalastyle:off line.size.limit + case _ if fallbackConfKey.isDefined && fallbackConfValue.isDefined && Cast.canCast(from, to) => +s""" + | cannot cast ${from.catalogString} to ${to.catalogString} with ANSI mode on. + | If you have to cast ${from.catalogString} to ${to.catalogString}, you can set ${fallbackConfKey.get} as ${fallbackConfValue.get}. Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #36616: [SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`
LuciferYang commented on PR #36616: URL: https://github.com/apache/spark/pull/36616#issuecomment-1132640913 If the contents and results of new benchmark are obvious, please tell me and I will delete it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36512: [SPARK-39152][CORE] Deregistering disk persisted local RDD blocks in case of IO related errors
Ngone51 commented on code in PR #36512: URL: https://github.com/apache/spark/pull/36512#discussion_r877873226 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -933,46 +935,56 @@ private[spark] class BlockManager( }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { + var diskData: BlockData = null try { -val diskData = diskStore.getBytes(blockId) -val iterToReturn: Iterator[Any] = { - if (level.deserialized) { -val diskValues = serializerManager.dataDeserializeStream( - blockId, - diskData.toInputStream())(info.classTag) -maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) - } else { -val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) - .map { _.toInputStream(dispose = false) } - .getOrElse { diskData.toInputStream() } -serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) - } +diskData = diskStore.getBytes(blockId) +val iterToReturn = if (level.deserialized) { + val diskValues = serializerManager.dataDeserializeStream( +blockId, +diskData.toInputStream())(info.classTag) + maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) +} else { + val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) +.map { _.toInputStream(dispose = false) } +.getOrElse { diskData.toInputStream() } + serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { releaseLockAndDispose(blockId, diskData, taskContext) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } catch { -case ex: KryoException if ex.getCause.isInstanceOf[IOException] => - // We need to have detailed log message to catch environmental problems easily. - // Further details: https://issues.apache.org/jira/browse/SPARK-37710 - processKryoException(ex, blockId) - throw ex +case t: Throwable => + if (diskData != null) { +diskData.dispose() +diskData = null + } + releaseLock(blockId, taskContext) + if (isIORelatedException(t)) { +logInfo(extendMessageWithBlockDetails(t.getMessage, blockId)) +// Remove the block so that its unavailability is reported to the driver +removeBlock(blockId) Review Comment: This not only affects RDD blocks? ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -933,10 +933,29 @@ private[spark] class BlockManager( }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { - try { -val diskData = diskStore.getBytes(blockId) -val iterToReturn: Iterator[Any] = { - if (level.deserialized) { + var retryCount = 0 + val retryLimit = 3 Review Comment: +1 to not retry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
MaxGekk commented on code in PR #36584: URL: https://github.com/apache/spark/pull/36584#discussion_r877871837 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnyValue.scala: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types._ + +/** + * Returns the first value of `child` for a group of rows. If the first value of `child` + * is `null`, it returns `null` (respecting nulls). Even if [[AnyValue]] is used on an already + * sorted column, if we do partial aggregation and final aggregation (when mergeExpression + * is used) its result will not be deterministic (unless the input table is sorted and has + * a single partition, and we use a single reducer to do the aggregation.). + * Interchangeable with [[First]]. + */ +@ExpressionDescription( + usage = """ +_FUNC_(expr[, isIgnoreNull]) - Returns some value of `expr` for a group of rows. + If `isIgnoreNull` is true, returns only non-null values.""", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES (10), (5), (20) AS tab(col); + 10 + > SELECT _FUNC_(col) FROM VALUES (NULL), (5), (20) AS tab(col); + NULL + > SELECT _FUNC_(col, true) FROM VALUES (NULL), (5), (20) AS tab(col); + 5 + """, + note = """ +The function is non-deterministic. + """, + group = "agg_funcs", + since = "3.4.0") +case class AnyValue(child: Expression, ignoreNulls: Boolean) Review Comment: Could you explain, please, why do you need a separate expression and why `any_value()` is not implemented as an alias of `First` like `first_value()`: https://github.com/apache/spark/blob/7221ea31b6bbad0d87b22e5413b8979bee56321c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L469 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877869223 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") +new Table( + name = t.identifier.name(), + database = t.identifier.namespace().head, + description = t.table.properties().get("comment"), + tableType = +if (isExternal) CatalogTableType.EXTERNAL.name +else CatalogTableType.MANAGED.name, + isTemporary = false) + case v: ResolvedView => +new Table( Review Comment: We will hit similar problems in `listDatabases` and `listFunctions` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877867840 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") +new Table( + name = t.identifier.name(), + database = t.identifier.namespace().head, + description = t.table.properties().get("comment"), + tableType = +if (isExternal) CatalogTableType.EXTERNAL.name +else CatalogTableType.MANAGED.name, + isTemporary = false) + case v: ResolvedView => +new Table( Review Comment: my major concern is this `Table` API. It needs to support n-part name but keep backward compatibility. I'm thinking about ``` class Table( val name: String, val qualifier: Array[String], ... ) { def database: String = if (qualifier.length == 2) qualifier(1) else null } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877867840 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") +new Table( + name = t.identifier.name(), + database = t.identifier.namespace().head, + description = t.table.properties().get("comment"), + tableType = +if (isExternal) CatalogTableType.EXTERNAL.name +else CatalogTableType.MANAGED.name, + isTemporary = false) + case v: ResolvedView => +new Table( Review Comment: my major concern is this `Table` API. It needs to support n-part name but keep backward compatibility. I'm thinking about ``` class Table( val name: String, val qualifiers: Array[String], ... ) { def database: String = if (qualifiers.length == 2) qualifiers(1) else null } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
MaxGekk commented on code in PR #36584: URL: https://github.com/apache/spark/pull/36584#discussion_r877864544 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnyValue.scala: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate Review Comment: Could you move `AnyValue.scala` to expressions/aggregate, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877861919 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") +new Table( + name = t.identifier.name(), + database = t.identifier.namespace().head, + description = t.table.properties().get("comment"), + tableType = +if (isExternal) CatalogTableType.EXTERNAL.name +else CatalogTableType.MANAGED.name, + isTemporary = false) + case v: ResolvedView => +new Table( + name = v.identifier.name(), + database = v.identifier.namespace().toString, + description = "", + tableType = "", Review Comment: looking at the previous behavior, I think this should be `if (v.isTemp) "TEMPORARY" else "VIEW"` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877853440 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -125,6 +135,31 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { isTemporary = isTemp) } + private def makeTable(ident: Seq[String]): Table = { +val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) +val node = sparkSession.sessionState.executePlan(plan).analyzed +node match { + case t: ResolvedTable => +val isExternal = t.table.properties().getOrDefault("external", "false").equals("true") Review Comment: ```suggestion val isExternal = t.table.properties().getOrDefault(TableCatalog.PROP_EXTERNAL, "false").equals("true") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #36531: [SPARK-39171][SQL] Unify the Cast expression
beliefer commented on PR #36531: URL: https://github.com/apache/spark/pull/36531#issuecomment-1132584853 ping @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
gengliangwang commented on PR #36614: URL: https://github.com/apache/spark/pull/36614#issuecomment-1132549324 @cloud-fan I have updated the doc and the screenshot. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
gengliangwang commented on code in PR #36614: URL: https://github.com/apache/spark/pull/36614#discussion_r877809823 ## docs/sql-ref-ansi-compliance.md: ## @@ -28,10 +28,10 @@ The casting behaviours are defined as store assignment rules in the standard. When `spark.sql.storeAssignmentPolicy` is set to `ANSI`, Spark SQL complies with the ANSI store assignment rules. This is a separate configuration because its default value is `ANSI`, while the configuration `spark.sql.ansi.enabled` is disabled by default. -|Property Name|Default|Meaning|Since Version| -|-|---|---|-| -|`spark.sql.ansi.enabled`|false|(Experimental) When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will forbid using the reserved keywords of ANSI SQL as identifiers in the SQL parser.|3.0.0| -|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value into a column with different data type, Spark will perform type conversion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy policy, Spark allows the type coercion as long as it is a valid Cast, which is very loose. e.g. converting string to int or double to boolean is allowed. It is also the only behavior in Spark 2.x and it is compatible with Hive. With strict policy, Spark doesn't allow any possible precision loss or data truncation in type coercion, e.g. converting double to int or decimal to double is not allowed.|3.0.0| +|Property Name|Default| Meaning |Since Version| +|-|---|---|-| +|`spark.sql.ansi.enabled`|false| When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will use different type coercion rules for resolving conflicts among data types. The rules are consistently based on data type precedence. |3.0.0| +|`spark.sql.storeAssignmentPolicy`|ANSI| When inserting a value into a column with different data type, Spark will perform type conversion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy
[GitHub] [spark] beliefer commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
beliefer commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r877803218 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCCatalog.scala: ## @@ -32,11 +35,14 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging { +class JDBCCatalog extends TableCatalog with SupportsNamespaces with FunctionCatalog with Logging { private var catalogName: String = null private var options: JDBCOptions = _ private var dialect: JdbcDialect = _ + private val functions: util.Map[Identifier, UnboundFunction] = +new ConcurrentHashMap[Identifier, UnboundFunction]() Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #36588: [SPARK-39217][SQL] Makes DPP support the pruning side has Union
wangyum commented on PR #36588: URL: https://github.com/apache/spark/pull/36588#issuecomment-1132514681 A case from production: ![image](https://user-images.githubusercontent.com/5399861/169463931-65bfd0c0-1759-4f9d-8a0a-66b32463b76a.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope
cloud-fan commented on code in PR #36608: URL: https://github.com/apache/spark/pull/36608#discussion_r877763059 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala: ## @@ -34,7 +34,7 @@ abstract class Covariance(val left: Expression, val right: Expression, nullOnDiv override def dataType: DataType = DoubleType override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType) - protected val n = AttributeReference("n", DoubleType, nullable = false)() + protected val count = AttributeReference("count", DoubleType, nullable = false)() Review Comment: shall we make it `protected[sql]` so that we can access it directly in the new expression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org