[GitHub] [spark] jerrypeng opened a new pull request, #38517: [WIP][SPARK-39591][SS] Async Progress Tracking
jerrypeng opened a new pull request, #38517: URL: https://github.com/apache/spark/pull/38517 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde commented on pull request #38495: [SPARK-35531][SQL] Update hive table stats without unnecessary convert
wankunde commented on PR #38495: URL: https://github.com/apache/spark/pull/38495#issuecomment-1304400795 @cloud-fan @AngersZh Could you help to review this PR ? Another PR https://github.com/apache/spark/pull/38496 depends on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde commented on pull request #38495: [SPARK-35531][SQL] Update hive table stats without unnecessary convert
wankunde commented on PR #38495: URL: https://github.com/apache/spark/pull/38495#issuecomment-1304399983 Retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
zhengruifeng commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014560227 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() +var numBatches = 0L + +if (rows.getNumPartitions > 0) { + val batches = rows.mapPartitionsInternal { iter => +ArrowConverters + .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) + } + + val obj = new Object + + val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => iter.toArray Review Comment: with batch_id, we can send higher partition before lower ones -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
zhengruifeng commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014559977 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() Review Comment: good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
zhengruifeng commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014559938 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() +var numBatches = 0L + +if (rows.getNumPartitions > 0) { + val batches = rows.mapPartitionsInternal { iter => +ArrowConverters + .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) + } + + val obj = new Object + + val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => iter.toArray + + val resultHandler = (partitionId: Int, taskResult: Array[(Array[Byte], Long, Long)]) => +obj.synchronized { + var batchId = partitionId.toLong << 33 + taskResult.foreach { case (bytes, count, size) => +val response = proto.Response.newBuilder().setClientId(clientId) +val batch = proto.Response.ArrowBatch + .newBuilder() + .setBatchId(batchId) + .setRowCount(count) + .setUncompressedBytes(size) + .setCompressedBytes(bytes.length) + .setData(ByteString.copyFrom(bytes)) + .build() +response.setArrowBatch(batch) +responseObserver.onNext(response.build()) Review Comment: ok will update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ljfgem commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
ljfgem commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014550696 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { Review Comment: I think they belong to `ViewCatalog` since there are similar properties in [TableCatalog](https://github.com/apache/spark/blob/5a71a7f7b5c1762677ddbfe39a7c35d645c25e94/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java#L48)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014549689 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { Review Comment: I left them out as they felt more "implementation" then "API". Do they belong to ViewCatalog? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38516: [SPARK-32380][SQL] Fixing access of HBase table via Hive
attilapiros commented on PR #38516: URL: https://github.com/apache/spark/pull/38516#issuecomment-1304373701 cc @dongjoon-hyun, @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] attilapiros opened a new pull request, #38516: Initial version
attilapiros opened a new pull request, #38516: URL: https://github.com/apache/spark/pull/38516 ### What changes were proposed in this pull request? This is an update of https://github.com/apache/spark/pull/29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section). The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'. - environments (Cloudera distribution 7.1.7.SP1): hadoop 3.1.1 hive 3.1.300 spark 3.2.1 hbase 2.2.3 ### Why are the changes needed? With the `NewHadoopRDD` the following exception is raised: ``` java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details. at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.head(Dataset.scala:2728) at org.apache.spark.sql.Dataset.take(Dataset.scala:2935) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287) at org.apache.spark.sql.Dataset.showString(Dataset.scala:326) at org.apache.spark.sql.Dataset.show(Dataset.scala:806) at org.apache.spark.sql.Dataset.show(Dataset.scala:765) at org.apache.spark.sql.Dataset.show(Dataset.scala:774) ... 47 elided Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557) at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248) ... 86 more ``` ### Short summary of the reason There are two interfaces: - the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List`) - the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`) And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here. ### Details Here all the link refers to master branches latest commits of components to get the right line
[GitHub] [spark] ljfgem commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
ljfgem commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014536719 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { Review Comment: Why do we remove the following preserved properties from the [umbrella PR](https://github.com/apache/spark/pull/35636/files#diff-7094226eb616c14235b9a88fb6d8bc4bef39fa4f9879aa8dc4cfe4d031b720e7)? ``` /** * A reserved property to specify the description of the view. */ String PROP_COMMENT = "comment"; /** * A reserved property to specify the owner of the view. */ String PROP_OWNER = "owner"; /** * A reserved property to specify the software version used to create the view. */ String PROP_CREATE_ENGINE_VERSION = "create_engine_version"; /** * A reserved property to specify the software version used to change the view. */ String PROP_ENGINE_VERSION = "engine_version"; /** * All reserved properties of the view. */ List RESERVED_PROPERTIES = Arrays.asList( PROP_COMMENT, PROP_OWNER, PROP_CREATE_ENGINE_VERSION, PROP_ENGINE_VERSION); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #34637: [SPARK-37349][SQL] add SQL Rest API parsing logic
github-actions[bot] commented on PR #34637: URL: https://github.com/apache/spark/pull/34637#issuecomment-1304354583 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #37083: [SPARK-39678][SQL] Improve stats estimation for v2 tables
github-actions[bot] closed pull request #37083: [SPARK-39678][SQL] Improve stats estimation for v2 tables URL: https://github.com/apache/spark/pull/37083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #37226: [MINOR][SQL] Simplify the description of built-in function.
github-actions[bot] closed pull request #37226: [MINOR][SQL] Simplify the description of built-in function. URL: https://github.com/apache/spark/pull/37226 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #37009: [SPARK-38292][PYTHON]Support na_filter for pyspark.pandas.read_csv
github-actions[bot] commented on PR #37009: URL: https://github.com/apache/spark/pull/37009#issuecomment-1304354576 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #37239: [SPARK-39825][SQL] Fix PushDownLeftSemiAntiJoin push through project
github-actions[bot] closed pull request #37239: [SPARK-39825][SQL] Fix PushDownLeftSemiAntiJoin push through project URL: https://github.com/apache/spark/pull/37239 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #37104: [SPARK-39698][SQL] Use `TakeOrderedAndProject` if maxRows below the `spark.sql.execution.topKSortMaxRowsThreshold`
github-actions[bot] commented on PR #37104: URL: https://github.com/apache/spark/pull/37104#issuecomment-1304354561 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #37309: [SPARK-39871][CORE] Jmx http interface supported for SparkHistoryServer
github-actions[bot] commented on PR #37309: URL: https://github.com/apache/spark/pull/37309#issuecomment-1304354542 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #37315: [SPARK-39892][SQL] Use ArrowType.Decimal(precision, scale, bitWidth) instead of ArrowType.Decimal(precision, scale)
github-actions[bot] commented on PR #37315: URL: https://github.com/apache/spark/pull/37315#issuecomment-1304354536 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014532124 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { + + /** + * List the views in a namespace from the catalog. + * + * If the catalog supports tables, this must return identifiers for only views and not tables. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for views + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listViews(String... namespace) throws NoSuchNamespaceException; + + /** + * Load view metadata by {@link Identifier ident} from the catalog. + * + * If the catalog supports tables and contains a table for the identifier and not a view, + * this must throw {@link NoSuchViewException}. + * + * @param ident a view identifier + * @return the view description + * @throws NoSuchViewException If the view doesn't exist or is a table + */ + View loadView(Identifier ident) throws NoSuchViewException; + + /** + * Invalidate cached view metadata for an {@link Identifier identifier}. + * + * If the view is already loaded or cached, drop cached data. If the view does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a view identifier + */ + default void invalidateView(Identifier ident) { + } + + /** + * Test whether a view exists using an {@link Identifier identifier} from the catalog. + * + * If the catalog supports views and contains a view for the identifier and not a table, + * this must return false. + * + * @param ident a view identifier + * @return true if the view exists, false otherwise + */ + default boolean viewExists(Identifier ident) { +try { + return loadView(ident) != null; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view in the catalog. + * + * @param ident a view identifier + * @param sql the SQL text that defines the view + * @param currentCatalog the current catalog + * @param currentNamespace the current namespace + * @param schema the view query output schema + * @param columnAliases the column aliases + * @param columnComments the column comments + * @param properties the view properties + * @return the view created + * @throws ViewAlreadyExistsException If a view or table already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] columnAliases, Review Comment: How about builder? ``` public interface ViewBuilder { ViewBuilder withQuery(String query); ViewBuilder withCurrentCatalog(String defaultCatalog); ViewBuilder withCurrentNamespace(String[] defaultNamespaces); ViewBuilder withSchema(StructType schema); ViewBuilder withQueryColumnNames(String[] queryColumnNames); ViewBuilder withColumnAliases(String[] columnAliases); ViewBuilder withColumnComments(String[] columnComments); ViewBuilder withProperties(Map properties); ViewBuilder withProperty(String key, String value); View create(); View replace(); View createOrReplace(); } ViewCatalog { ViewBuilder buildView(Identifier ident); } ``` ##
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014532124 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { + + /** + * List the views in a namespace from the catalog. + * + * If the catalog supports tables, this must return identifiers for only views and not tables. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for views + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listViews(String... namespace) throws NoSuchNamespaceException; + + /** + * Load view metadata by {@link Identifier ident} from the catalog. + * + * If the catalog supports tables and contains a table for the identifier and not a view, + * this must throw {@link NoSuchViewException}. + * + * @param ident a view identifier + * @return the view description + * @throws NoSuchViewException If the view doesn't exist or is a table + */ + View loadView(Identifier ident) throws NoSuchViewException; + + /** + * Invalidate cached view metadata for an {@link Identifier identifier}. + * + * If the view is already loaded or cached, drop cached data. If the view does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a view identifier + */ + default void invalidateView(Identifier ident) { + } + + /** + * Test whether a view exists using an {@link Identifier identifier} from the catalog. + * + * If the catalog supports views and contains a view for the identifier and not a table, + * this must return false. + * + * @param ident a view identifier + * @return true if the view exists, false otherwise + */ + default boolean viewExists(Identifier ident) { +try { + return loadView(ident) != null; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view in the catalog. + * + * @param ident a view identifier + * @param sql the SQL text that defines the view + * @param currentCatalog the current catalog + * @param currentNamespace the current namespace + * @param schema the view query output schema + * @param columnAliases the column aliases + * @param columnComments the column comments + * @param properties the view properties + * @return the view created + * @throws ViewAlreadyExistsException If a view or table already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] columnAliases, Review Comment: How about builder? ``` public interface ViewBuilder { ViewBuilder withQuery(String query); ViewBuilder withCurrentCatalog(String defaultCatalog); ViewBuilder withCurrentNamespace(String[] defaultNamespaces); ViewBuilder withSchema(StructType schema); ViewBuilder withQueryColumnNames(String[] queryColumnNames); ViewBuilder withColumnAliases(String[] columnAliases); ViewBuilder withColumnComments(String[] columnComments); ViewBuilder withProperties(Map properties); ViewBuilder withProperty(String key, String value); View create(); View replace(); View createOrReplace(); } ViewCatalog { ViewBuilder buildView(Identifier ident); } ``` -- This is an automated message from the Apache Git
[GitHub] [spark] SandishKumarHN commented on pull request #38515: [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator
SandishKumarHN commented on PR #38515: URL: https://github.com/apache/spark/pull/38515#issuecomment-1304351416 @rangadi Because some random numbers do not convert to catalyst type, a null check for the data generator is required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SandishKumarHN opened a new pull request, #38515: [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator
SandishKumarHN opened a new pull request, #38515: URL: https://github.com/apache/spark/pull/38515 ### What changes were proposed in this pull request? null check for data generator after type conversion NA ### Why are the changes needed? NA ### Does this PR introduce _any_ user-facing change? NA ### How was this patch tested? I have tested all the random manually current unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014530432 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { + + /** + * List the views in a namespace from the catalog. + * + * If the catalog supports tables, this must return identifiers for only views and not tables. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for views + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listViews(String... namespace) throws NoSuchNamespaceException; + + /** + * Load view metadata by {@link Identifier ident} from the catalog. + * + * If the catalog supports tables and contains a table for the identifier and not a view, + * this must throw {@link NoSuchViewException}. + * + * @param ident a view identifier + * @return the view description + * @throws NoSuchViewException If the view doesn't exist or is a table + */ + View loadView(Identifier ident) throws NoSuchViewException; + + /** + * Invalidate cached view metadata for an {@link Identifier identifier}. + * + * If the view is already loaded or cached, drop cached data. If the view does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a view identifier + */ + default void invalidateView(Identifier ident) { + } + + /** + * Test whether a view exists using an {@link Identifier identifier} from the catalog. + * + * If the catalog supports views and contains a view for the identifier and not a table, + * this must return false. + * + * @param ident a view identifier + * @return true if the view exists, false otherwise + */ + default boolean viewExists(Identifier ident) { +try { + return loadView(ident) != null; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view in the catalog. + * + * @param ident a view identifier + * @param sql the SQL text that defines the view + * @param currentCatalog the current catalog + * @param currentNamespace the current namespace + * @param schema the view query output schema + * @param columnAliases the column aliases + * @param columnComments the column comments + * @param properties the view properties + * @return the view created + * @throws ViewAlreadyExistsException If a view or table already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] columnAliases, Review Comment: I'd hesitate to do so because these have different meanings: one is more like "CreateViewRequest", the other like "ViewMetadata". Using the same interface may cause confusion and evolving one of them in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
xkrogen commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014526120 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { + + /** + * List the views in a namespace from the catalog. + * + * If the catalog supports tables, this must return identifiers for only views and not tables. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for views + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listViews(String... namespace) throws NoSuchNamespaceException; + + /** + * Load view metadata by {@link Identifier ident} from the catalog. + * + * If the catalog supports tables and contains a table for the identifier and not a view, + * this must throw {@link NoSuchViewException}. + * + * @param ident a view identifier + * @return the view description + * @throws NoSuchViewException If the view doesn't exist or is a table + */ + View loadView(Identifier ident) throws NoSuchViewException; + + /** + * Invalidate cached view metadata for an {@link Identifier identifier}. + * + * If the view is already loaded or cached, drop cached data. If the view does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a view identifier + */ + default void invalidateView(Identifier ident) { + } + + /** + * Test whether a view exists using an {@link Identifier identifier} from the catalog. + * + * If the catalog supports views and contains a view for the identifier and not a table, + * this must return false. + * + * @param ident a view identifier + * @return true if the view exists, false otherwise + */ + default boolean viewExists(Identifier ident) { +try { + return loadView(ident) != null; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view in the catalog. + * + * @param ident a view identifier + * @param sql the SQL text that defines the view + * @param currentCatalog the current catalog + * @param currentNamespace the current namespace + * @param schema the view query output schema + * @param columnAliases the column aliases + * @param columnComments the column comments + * @param properties the view properties + * @return the view created + * @throws ViewAlreadyExistsException If a view or table already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] columnAliases, Review Comment: Actually, should the argument to this just be a `View`...? We are basically passing all of the same information here. `View` doesn't contain any "derived" fields you can only learn after creating the view. Similar to `ViewChange`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
srowen commented on code in PR #38510: URL: https://github.com/apache/spark/pull/38510#discussion_r1014525886 ## docs/sql-performance-tuning.md: ## @@ -77,8 +77,8 @@ that these options will be deprecated in future release as more optimizations ar spark.sql.files.openCostInBytes 4194304 (4 MB) - The estimated cost to open a file, measured by the number of bytes could be scanned in the same - time. This is used when putting multiple files into a partition. It is better to over-estimated, + The estimated cost to open a file, measured by the number of bytes could that be scanned in the same Review Comment: One more - this should be "that could be" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
xkrogen commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014524565 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: LGTM. We are capturing some redundant information but I recognize the value in trying to capture everything from the `CREATE VIEW` statement to make things clear and not rely on derived attributes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
xkrogen commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014524565 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: LGTM. We are capturing some redundant information but I recognize the value in trying to persist everything from the `CREATE VIEW` statement to make things clear and not rely on derived attributes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] swamirishi commented on a diff in pull request #38377: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
swamirishi commented on code in PR #38377: URL: https://github.com/apache/spark/pull/38377#discussion_r1014524002 ## core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala: ## @@ -142,7 +142,7 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging { threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread") threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, UPLOAD_INTERVAL_IN_SECS, TimeUnit.SECONDS) - logInfo(s"Started driver log file sync to: ${dfsLogFile}") + logInfo(s"Started driver log file sync to: ${dfsLogFile.toString}") Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014523963 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { + + /** + * List the views in a namespace from the catalog. + * + * If the catalog supports tables, this must return identifiers for only views and not tables. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for views + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listViews(String... namespace) throws NoSuchNamespaceException; + + /** + * Load view metadata by {@link Identifier ident} from the catalog. + * + * If the catalog supports tables and contains a table for the identifier and not a view, + * this must throw {@link NoSuchViewException}. + * + * @param ident a view identifier + * @return the view description + * @throws NoSuchViewException If the view doesn't exist or is a table + */ + View loadView(Identifier ident) throws NoSuchViewException; + + /** + * Invalidate cached view metadata for an {@link Identifier identifier}. + * + * If the view is already loaded or cached, drop cached data. If the view does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a view identifier + */ + default void invalidateView(Identifier ident) { + } + + /** + * Test whether a view exists using an {@link Identifier identifier} from the catalog. + * + * If the catalog supports views and contains a view for the identifier and not a table, + * this must return false. + * + * @param ident a view identifier + * @return true if the view exists, false otherwise + */ + default boolean viewExists(Identifier ident) { +try { + return loadView(ident) != null; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view in the catalog. + * + * @param ident a view identifier + * @param sql the SQL text that defines the view + * @param currentCatalog the current catalog + * @param currentNamespace the current namespace + * @param schema the view query output schema + * @param columnAliases the column aliases + * @param columnComments the column comments + * @param properties the view properties + * @return the view created + * @throws ViewAlreadyExistsException If a view or table already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] columnAliases, Review Comment: Should we create a Builder? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014522636 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: Pushed a few changes: - Updated Javadoc for View.schema to indicate aliases applied - Added `View.queryColumnNames` to store output column names of the query that creates this view - Renamed `View.sql` to `View.query` because the term "query" is more accurate to describe the SELECT query that creates the view. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014522636 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: Pushed a few changes: - Updated Javadoc for View.schema to indicate aliases applied - Added `View.queryColumnNames` to store output column names - Renamed `View.sql` to `View.query` because "query" is more accurate to describe the SELECT query for the view. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014522636 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: Pushed a few changes: - Updated Javadoc for View.schema to indicate aliases applied - Added `View.queryColumnNames` to store output column names, useful for SELECT star queries - Renamed `View.sql` to `View.query` because "query" is more accurate to describe the SELECT query for the view. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
xkrogen commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014522309 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +/** + * Catalog methods for working with views. + */ +@DeveloperApi +public interface ViewCatalog extends CatalogPlugin { + + /** + * List the views in a namespace from the catalog. + * + * If the catalog supports tables, this must return identifiers for only views and not tables. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for views + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listViews(String... namespace) throws NoSuchNamespaceException; + + /** + * Load view metadata by {@link Identifier ident} from the catalog. + * + * If the catalog supports tables and contains a table for the identifier and not a view, + * this must throw {@link NoSuchViewException}. + * + * @param ident a view identifier + * @return the view description + * @throws NoSuchViewException If the view doesn't exist or is a table + */ + View loadView(Identifier ident) throws NoSuchViewException; + + /** + * Invalidate cached view metadata for an {@link Identifier identifier}. + * + * If the view is already loaded or cached, drop cached data. If the view does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a view identifier + */ + default void invalidateView(Identifier ident) { + } + + /** + * Test whether a view exists using an {@link Identifier identifier} from the catalog. + * + * If the catalog supports views and contains a view for the identifier and not a table, + * this must return false. + * + * @param ident a view identifier + * @return true if the view exists, false otherwise + */ + default boolean viewExists(Identifier ident) { +try { + return loadView(ident) != null; +} catch (NoSuchViewException e) { + return false; +} + } + + /** + * Create a view in the catalog. + * + * @param ident a view identifier + * @param sql the SQL text that defines the view + * @param currentCatalog the current catalog + * @param currentNamespace the current namespace + * @param schema the view query output schema + * @param columnAliases the column aliases + * @param columnComments the column comments + * @param properties the view properties + * @return the view created + * @throws ViewAlreadyExistsException If a view or table already exists for the identifier + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] columnAliases, Review Comment: We need `queryColumnNames` here as well right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014515719 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: Yeah, for select star, we need something similar to `viewQueryColumnNames`. If schema is pre-alias, then `schema.fieldNames` can serve the purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
xkrogen commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014507922 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: If both `columnAliases` and `schema` contain the final output names (post-alias), then where are the original column names stored? We need those to reconstruct the aliasing similar to the code I linked from `SessionCatalog`. Each element of the projection looks like `Alias(UpCast(, ), )`, which cannot be constructed without access to the original column names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
jzhuge commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014506227 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: No, `schema()` contains the final schema, the same as V1, to reduce confusion. I will update javadoc to clarify. Thank you and @wmoustafa for calling it out! For V1, schema stores column aliases. But for V2, we decided to use these metadata fields to capture the entire CREATE VIEW statement instead of relying on `schema` which feels more like "derived": - sql - columnAliases - columnComments Let me know if this makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface
xkrogen commented on code in PR #37556: URL: https://github.com/apache/spark/pull/37556#discussion_r1014502881 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a persisted view. + */ +@DeveloperApi +public interface View { + /** + * A name to identify this view. + */ + String name(); + + /** + * The view query SQL text. + */ + String sql(); + + /** + * The current catalog when the view is created. + */ + String currentCatalog(); + + /** + * The current namespace when the view is created. + */ + String[] currentNamespace(); + + /** + * The schema for the SQL text when the view is created. + */ + StructType schema(); + + /** + * The view column aliases. + */ + String[] columnAliases(); + Review Comment: @jzhuge is the intent that `schema()` contains the _pre-alias_ schema and `columnAliases()` contains the list of aliases, such that you need to combine `schema() + columnAliases()` to get the _post-alias_ schema? This is in contrast to how V1 views work, where `CatalogTable.schema()` contains the _post-alias_ schema and `viewQueryColumnNames` contains the original column names. The plan is constructed by aliasing from the `viewQueryColumnNames` to the names stored in `schema` (see [this code in SessionCatalog](https://github.com/apache/spark/blob/933dc0c42f0caf74aaa077fd4f2c2e7208452b9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L949-L985)). I don't have a strong opinion about whether it makes more sense for `schema()` to be pre- or post-aliasing, but we need to make this very clear in the interface description. If we go with pre-aliasing, we can convert it to post-alias inside of [V2ViewDescription](https://github.com/apache/spark/pull/28147/files#diff-d0c6f499b30df039d13bf2740c559251ab63ba4cea312e622e23b74fcbb2fcf0) to make it match V1 in the analyzer, reducing implementation complexity by avoiding having to special-case V1 vs V2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] liuzqt commented on pull request #38064: [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB
liuzqt commented on PR #38064: URL: https://github.com/apache/spark/pull/38064#issuecomment-1304306588 @mridulm I got a error when running that command in my local ``` [error] /Users/ziqi.liu/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala:51: File line length exceeds 100 characters ``` but it seems irrelevant to this PRand I just merged with upstream master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #38064: [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB
mridulm commented on PR #38064: URL: https://github.com/apache/spark/pull/38064#issuecomment-1304281819 Looks like doc build is failing and so failing build ... Can you run `build/sbt -Phadoop-3 -Pyarn -Pdocker-integration-tests -Pspark-ganglia-lgpl -Phive -Pmesos -Phive-thriftserver -Pkubernetes -Pkinesis-asl -Phadoop-cloud unidoc` against your local repo and see if it succeeds ? Fix might be a simple case of updating to latest master (Works fine on local build for me) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
leewyang commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1014476080 ## python/pyspark/ml/functions.py: ## @@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( +data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if isinstance(data, pd.DataFrame): +index = 0 +data_size = len(data) +while index < data_size: +yield data.iloc[index : index + batch_size] +index += batch_size +else: +# convert (tuple of) pd.Series into pd.DataFrame +if isinstance(data, pd.Series): +df = pd.concat((data,), axis=1) +else: # isinstance(data, Tuple[pd.Series]): +df = pd.concat(data, axis=1) + +index = 0 +data_size = len(df) +while index < data_size: +yield df.iloc[index : index + batch_size] +index += batch_size + + +def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool: +if isinstance(data, pd.Series): +return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list)) +elif isinstance(data, pd.DataFrame): +return any(data.dtypes == np.object_) and any( +[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]] +) +else: +raise ValueError( +"Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data)) +) + + +def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> bool: +"""Check if input Series/DataFrame/Tuple contains any tensor-valued columns.""" +if isinstance(data, (pd.Series, pd.DataFrame)): +return _is_tensor_col(data) +else: # isinstance(data, Tuple): +return any(_is_tensor_col(elem) for elem in data) + + +def _validate( +preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]], +num_input_rows: int, +return_type: DataType, +) -> None: +"""Validate model predictions against the expected pandas_udf return_type.""" +if isinstance(return_type, StructType): +struct_rtype: StructType = return_type +fieldNames = struct_rtype.names +if isinstance(preds, dict): +# dictionary of columns +predNames = list(preds.keys()) +if not all(v.shape == (num_input_rows,) for v in preds.values()): +raise ValueError("Prediction results for StructType fields must be scalars.") +elif isinstance(preds, list) and isinstance(preds[0], dict): +# rows of dictionaries +predNames = list(preds[0].keys()) +if len(preds) != num_input_rows: +raise ValueError("Prediction results must have same length as input data.") Review Comment: Yeah, @mengxr had the same question. Some models, e.g. [Huggingface pipeline for sentiment analysis](https://huggingface.co/docs/transformers/quicktour#pipeline-usage), can produce results in this format, so he agreed to keep this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
leewyang commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1014475066 ## python/pyspark/ml/functions.py: ## @@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( +data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if isinstance(data, pd.DataFrame): +index = 0 +data_size = len(data) +while index < data_size: +yield data.iloc[index : index + batch_size] +index += batch_size Review Comment: Ended up refactoring the code a bit to simplify this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38506: [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client
AmplabJenkins commented on PR #38506: URL: https://github.com/apache/spark/pull/38506#issuecomment-1304270266 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38505: [SPARK-40622][WIP]do not merge(try to fix build error)
AmplabJenkins commented on PR #38505: URL: https://github.com/apache/spark/pull/38505#issuecomment-1304270304 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #38377: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
mridulm commented on code in PR #38377: URL: https://github.com/apache/spark/pull/38377#discussion_r1014469439 ## core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala: ## @@ -142,7 +142,7 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging { threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread") threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, UPLOAD_INTERVAL_IN_SECS, TimeUnit.SECONDS) - logInfo(s"Started driver log file sync to: ${dfsLogFile}") + logInfo(s"Started driver log file sync to: ${dfsLogFile.toString}") Review Comment: nit: remove `.toString` ## core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala: ## @@ -126,13 +126,13 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging { throw new RuntimeException(s"${rootDir} does not exist." + s" Please create this dir in order to persist driver logs") } - val dfsLogFile: String = FileUtils.getFile(rootDir, appId -+ DriverLogger.DRIVER_LOG_FILE_SUFFIX).getAbsolutePath() + val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, appId ++ DriverLogger.DRIVER_LOG_FILE_SUFFIX)) Review Comment: Looking at it more, given all resolution is related to `fileSystem`, we dont need to either resolve or qualify it technically - the only reason to do so is for the log message below: so I am fine with this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014428293 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging { // Disallow multiple streaming aggregations val aggregates = collectStreamingAggregates(plan) -if (aggregates.size > 1) { +if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { throwError( "Multiple streaming aggregations are not supported with " + Review Comment: and we need to change the message accordingly - 'aggregations' -> 'stateful operators' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
alex-balikov commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014425174 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging { // Disallow multiple streaming aggregations val aggregates = collectStreamingAggregates(plan) -if (aggregates.size > 1) { +if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { Review Comment: Absolutely. I agree that we should allow multiple stateful ops only in append mode. The other modes are not implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dwsmith1983 commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
dwsmith1983 commented on code in PR #38510: URL: https://github.com/apache/spark/pull/38510#discussion_r1014424662 ## docs/sql-performance-tuning.md: ## @@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when the runtime statistics spark.sql.adaptive.autoBroadcastJoinThreshold (none) - Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. + Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. Review Comment: @srowen done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
srowen commented on code in PR #38510: URL: https://github.com/apache/spark/pull/38510#discussion_r1014421466 ## docs/sql-performance-tuning.md: ## @@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when the runtime statistics spark.sql.adaptive.autoBroadcastJoinThreshold (none) - Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. + Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. Review Comment: I'm not suggesting revert; it's a further change. "same with" isn't grammatical (either) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dwsmith1983 commented on pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
dwsmith1983 commented on PR #38510: URL: https://github.com/apache/spark/pull/38510#issuecomment-1304103037 > OK, any other related files you want to check while your'e here? I am doing some studying so not sure what other docs I will read and when. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dwsmith1983 commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
dwsmith1983 commented on code in PR #38510: URL: https://github.com/apache/spark/pull/38510#discussion_r1014419552 ## docs/sql-performance-tuning.md: ## @@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when the runtime statistics spark.sql.adaptive.autoBroadcastJoinThreshold (none) - Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. + Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. Review Comment: @srowen thanks for the reply. It just sounded a bit strange. I can convert it back if you like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
srowen commented on code in PR #38510: URL: https://github.com/apache/spark/pull/38510#discussion_r1014417907 ## docs/sql-performance-tuning.md: ## @@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when the runtime statistics spark.sql.adaptive.autoBroadcastJoinThreshold (none) - Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. + Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework. Review Comment: the same with -> the same as -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #38427: [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13
mridulm commented on PR #38427: URL: https://github.com/apache/spark/pull/38427#issuecomment-1304076366 Merged to master. Thanks for working on this @eejbyfeldt ! Thanks for the reviews @srowen, @dongjoon-hyun, @LuciferYang :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] asfgit closed pull request #38427: [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13
asfgit closed pull request #38427: [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13 URL: https://github.com/apache/spark/pull/38427 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38509: [SPARK-41014][PySpark][DOC] Improve documentation and typing of groupby and cogroup applyInPandas
AmplabJenkins commented on PR #38509: URL: https://github.com/apache/spark/pull/38509#issuecomment-1304060587 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
AmplabJenkins commented on PR #38510: URL: https://github.com/apache/spark/pull/38510#issuecomment-1304060535 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1014392620 ## core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala: ## @@ -1780,7 +1802,19 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Remote Bytes Read" : 0, | "Remote Bytes Read To Disk" : 0, | "Local Bytes Read" : 0, - | "Total Records Read" : 0 + | "Total Records Read" : 0, + | "Remote Requests Duration": 0, + | "Push Based": { Review Comment: +CC @zhouyejoe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014391666 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { assertPassOnGlobalWatermarkLimit( s"single $joinType join in Append mode", streamRelation.join(streamRelation, joinType = RightOuter, - condition = Some(attributeWithWatermark === attribute)), Review Comment: Thanks for the check! Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands
aokolnychyi commented on PR #36304: URL: https://github.com/apache/spark/pull/36304#issuecomment-1304020160 Still remember about following up on this and another PR. Slowly getting there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk opened a new pull request, #38514: [WIP][SQL] Provide a query context to `failAnalysis()`
MaxGekk opened a new pull request, #38514: URL: https://github.com/apache/spark/pull/38514 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #38513: [SPARK-40903][SQL][FOLLOWUP] Cast canonicalized Add as its original data type if necessary
gengliangwang commented on PR #38513: URL: https://github.com/apache/spark/pull/38513#issuecomment-1304002389 cc @cloud-fan @srielau @ulysses-you @peter-toth -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #38513: [SPARK-40903][SQL][FOLLOWUP] Cast canonicalized Add as its original data type if necessary
gengliangwang opened a new pull request, #38513: URL: https://github.com/apache/spark/pull/38513 ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/38379. On second thought, if the canonicalized `Add` has a different type, casting it as the original data type can still match more semantically equivalent `Add`s ### Why are the changes needed? A better solution for the issue https://issues.apache.org/jira/browse/SPARK-40903. We can avoid regressions on marking on certain semantically equivalent `Add`s as not equivalent. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #38488: [SPARK-41002][CONNECT][PYTHON] Compatible `take`, `head` and `first` API in Python client
amaliujia commented on PR #38488: URL: https://github.com/apache/spark/pull/38488#issuecomment-1303988901 Ok added short description for the new test cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #38506: [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client
amaliujia commented on PR #38506: URL: https://github.com/apache/spark/pull/38506#issuecomment-1303988661 Ok added short description for the new test cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #38479: [SPARK-40697][SQL][FOLLOWUP] Read-side char padding should only be applied if necessary
gengliangwang closed pull request #38479: [SPARK-40697][SQL][FOLLOWUP] Read-side char padding should only be applied if necessary URL: https://github.com/apache/spark/pull/38479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #38479: [SPARK-40697][SQL][FOLLOWUP] Read-side char padding should only be applied if necessary
gengliangwang commented on PR #38479: URL: https://github.com/apache/spark/pull/38479#issuecomment-1303969098 Thanks for fixing it. Merging to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes
MaxGekk closed pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes URL: https://github.com/apache/spark/pull/38498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes
MaxGekk commented on PR #38498: URL: https://github.com/apache/spark/pull/38498#issuecomment-1303953617 +1, LGTM. Merging to master. Thank you, @LuciferYang. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming +otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false +} + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: Will do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #38223: [SPARK-40770][PYTHON] Improved error messages for applyInPandas for schema mismatch
ueshin commented on code in PR #38223: URL: https://github.com/apache/spark/pull/38223#discussion_r1014300546 ## python/pyspark/worker.py: ## @@ -159,27 +226,13 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se key_series = left_key_series if not left_df.empty else right_key_series key = tuple(s[0] for s in key_series) result = f(key, left_df, right_df) -if not isinstance(result, pd.DataFrame): -raise TypeError( -"Return type of the user-defined function should be " -"pandas.DataFrame, but is {}".format(type(result)) -) -# the number of columns of result have to match the return type -# but it is fine for result to have no columns at all if it is empty -if not ( -len(result.columns) == len(return_type) or len(result.columns) == 0 and result.empty -): -raise RuntimeError( -"Number of columns of the returned pandas.DataFrame " -"doesn't match specified schema. " -"Expected: {} Actual: {}".format(len(return_type), len(result.columns)) -) +verify_pandas_result(result, return_type, assign_cols_by_name(runner_conf)) Review Comment: `wrapped` will be called many times. We want to reduce as much overhead as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #37887: [SPARK-40360] ALREADY_EXISTS and NOT_FOUND exceptions
MaxGekk commented on code in PR #37887: URL: https://github.com/apache/spark/pull/37887#discussion_r1014297862 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala: ## @@ -20,66 +20,112 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.catalyst.util.{quoteIdentifier, quoteNameParts } import org.apache.spark.sql.types.StructType /** * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ class DatabaseAlreadyExistsException(db: String) - extends NamespaceAlreadyExistsException(s"Database '$db' already exists") + extends NamespaceAlreadyExistsException(Array(db)) -class NamespaceAlreadyExistsException(message: String) - extends AnalysisException( -message, -errorClass = Some("_LEGACY_ERROR_TEMP_1118"), -messageParameters = Map("msg" -> message)) { + +class NamespaceAlreadyExistsException(errorClass: String, messageParameters: Map[String, String]) + extends AnalysisException(errorClass, messageParameters) { def this(namespace: Array[String]) = { -this(s"Namespace '${namespace.quoted}' already exists") +this(errorClass = "SCHEMA_ALREADY_EXISTS", + Map("schemaName" -> quoteNameParts(namespace))) } } -class TableAlreadyExistsException(message: String, cause: Option[Throwable] = None) - extends AnalysisException( -message, -errorClass = Some("_LEGACY_ERROR_TEMP_1116"), -messageParameters = Map("msg" -> message), -cause = cause) { + +class TableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String], + cause: Option[Throwable] = None) + extends AnalysisException(errorClass, messageParameters, cause = cause) { def this(db: String, table: String) = { -this(s"Table or view '$table' already exists in database '$db'") +this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" -> +(quoteIdentifier(db) + "." + quoteIdentifier(table + } + + def this(table: String) = { +this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" -> +quoteNameParts(UnresolvedAttribute.parseAttributeName(table } - def this(tableIdent: Identifier) = { -this(s"Table ${tableIdent.quoted} already exists") + def this(table: Seq[String]) = { +this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" -> quoteNameParts(table))) + } +} + +class TempTableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String], + cause: Option[Throwable] = None) + extends AnalysisException(errorClass, messageParameters, cause = cause) { + def this(table: String) = { +this(errorClass = "TEMP_TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" +-> quoteNameParts(UnresolvedAttribute.parseAttributeName(table } } -class TempTableAlreadyExistsException(table: String) - extends TableAlreadyExistsException(s"Temporary view '$table' already exists") +class PartitionAlreadyExistsException(errorClass: String, messageParameters: Map[String, String]) + extends AnalysisException(errorClass, messageParameters) { + def this(db: String, table: String, spec: TablePartitionSpec) = { +this(errorClass = "PARTITIONS_ALREADY_EXIST", + Map("partitionList" -> ("PARTITION (" + +spec.map( kv => quoteIdentifier(kv._1) + s" = ${kv._2}").mkString(", ") + ")"), +"tableName" -> (quoteIdentifier(db) + "." + quoteIdentifier(table + } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { +this(errorClass = "PARTITIONS_ALREADY_EXIST", + Map("partitionList" -> +("PARTITION (" + partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) +.map( kv => quoteIdentifier(s"${kv._2}") + s" = ${kv._1}").mkString(", ") + ")"), +"tableName" -> quoteNameParts(UnresolvedAttribute.parseAttributeName(tableName + } +} -class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) { +class PartitionsAlreadyExistException(errorClass: String, messageParameters: Map[String, String]) + extends AnalysisException(errorClass, messageParameters) { def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { -this(s"The following partitions already exist in table '$table' database '$db':\n" - + specs.mkString("\n===\n")) +this(errorClass =
[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog
jerrypeng commented on code in PR #38430: URL: https://github.com/apache/spark/pull/38430#discussion_r1014297681 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala: ## @@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: for (batchId <- batchIds if batchId > thresholdBatchId) { val path = batchIdToPath(batchId) fileManager.delete(path) + if (metadataCacheEnabled) batchCache.remove(batchId) logTrace(s"Removed metadata log file: $path") } } + + /** + * List the available batches on file system. As a workaround for S3 inconsistent list, it also Review Comment: I think this comment is out of date. Amazon now delivers strong read after write consistency. I will remove in a subsequent PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anchovYu commented on a diff in pull request #37887: [SPARK-40360] ALREADY_EXISTS and NOT_FOUND exceptions
anchovYu commented on code in PR #37887: URL: https://github.com/apache/spark/pull/37887#discussion_r1014283621 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala: ## @@ -20,66 +20,112 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.catalyst.util.{quoteIdentifier, quoteNameParts } import org.apache.spark.sql.types.StructType /** * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ class DatabaseAlreadyExistsException(db: String) - extends NamespaceAlreadyExistsException(s"Database '$db' already exists") + extends NamespaceAlreadyExistsException(Array(db)) -class NamespaceAlreadyExistsException(message: String) - extends AnalysisException( -message, -errorClass = Some("_LEGACY_ERROR_TEMP_1118"), -messageParameters = Map("msg" -> message)) { + +class NamespaceAlreadyExistsException(errorClass: String, messageParameters: Map[String, String]) + extends AnalysisException(errorClass, messageParameters) { def this(namespace: Array[String]) = { -this(s"Namespace '${namespace.quoted}' already exists") +this(errorClass = "SCHEMA_ALREADY_EXISTS", + Map("schemaName" -> quoteNameParts(namespace))) } } -class TableAlreadyExistsException(message: String, cause: Option[Throwable] = None) - extends AnalysisException( -message, -errorClass = Some("_LEGACY_ERROR_TEMP_1116"), -messageParameters = Map("msg" -> message), -cause = cause) { + +class TableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String], + cause: Option[Throwable] = None) + extends AnalysisException(errorClass, messageParameters, cause = cause) { def this(db: String, table: String) = { -this(s"Table or view '$table' already exists in database '$db'") +this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" -> +(quoteIdentifier(db) + "." + quoteIdentifier(table + } + + def this(table: String) = { +this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" -> +quoteNameParts(UnresolvedAttribute.parseAttributeName(table } - def this(tableIdent: Identifier) = { -this(s"Table ${tableIdent.quoted} already exists") + def this(table: Seq[String]) = { +this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" -> quoteNameParts(table))) + } +} + +class TempTableAlreadyExistsException(errorClass: String, messageParameters: Map[String, String], + cause: Option[Throwable] = None) + extends AnalysisException(errorClass, messageParameters, cause = cause) { + def this(table: String) = { +this(errorClass = "TEMP_TABLE_OR_VIEW_ALREADY_EXISTS", + messageParameters = Map("relationName" +-> quoteNameParts(UnresolvedAttribute.parseAttributeName(table } } -class TempTableAlreadyExistsException(table: String) - extends TableAlreadyExistsException(s"Temporary view '$table' already exists") +class PartitionAlreadyExistsException(errorClass: String, messageParameters: Map[String, String]) + extends AnalysisException(errorClass, messageParameters) { + def this(db: String, table: String, spec: TablePartitionSpec) = { +this(errorClass = "PARTITIONS_ALREADY_EXIST", + Map("partitionList" -> ("PARTITION (" + +spec.map( kv => quoteIdentifier(kv._1) + s" = ${kv._2}").mkString(", ") + ")"), +"tableName" -> (quoteIdentifier(db) + "." + quoteIdentifier(table + } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { +this(errorClass = "PARTITIONS_ALREADY_EXIST", + Map("partitionList" -> +("PARTITION (" + partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) +.map( kv => quoteIdentifier(s"${kv._2}") + s" = ${kv._1}").mkString(", ") + ")"), +"tableName" -> quoteNameParts(UnresolvedAttribute.parseAttributeName(tableName + } +} -class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) { +class PartitionsAlreadyExistException(errorClass: String, messageParameters: Map[String, String]) + extends AnalysisException(errorClass, messageParameters) { def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { -this(s"The following partitions already exist in table '$table' database '$db':\n" - + specs.mkString("\n===\n")) +this(errorClass =
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014281945 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming Review Comment: Oh thank you so much for spotting that out! ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { Review Comment: Done, put it under isStreamStreamIntervalJoin. Also add `private` before the functions to limit the scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014280302 ## sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala: ## @@ -128,6 +128,65 @@ private[sql] object ArrowConverters extends Logging { } } + private[sql] def toArrowBatchIterator( + rowIter: Iterator[InternalRow], + schema: StructType, + maxRecordsPerBatch: Int, + timeZoneId: String, + context: TaskContext): Iterator[(Array[Byte], Long, Long)] = { +val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) +val allocator = ArrowUtils.rootAllocator.newChildAllocator( + "toArrowBatchIterator", 0, Long.MaxValue) + +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val unloader = new VectorUnloader(root) +val arrowWriter = ArrowWriter.create(root) + +if (context != null) { // for test at driver + context.addTaskCompletionListener[Unit] { _ => +root.close() +allocator.close() + } +} + +new Iterator[(Array[Byte], Long, Long)] { + + override def hasNext: Boolean = rowIter.hasNext || { +root.close() +allocator.close() +false + } + + override def next(): (Array[Byte], Long, Long) = { +val out = new ByteArrayOutputStream() +val writeChannel = new WriteChannel(Channels.newChannel(out)) + +var rowCount = 0L +var estimatedSize = 0L +Utils.tryWithSafeFinally { + while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || rowCount < maxRecordsPerBatch)) { +val row = rowIter.next() +arrowWriter.write(row) +rowCount += 1 +estimatedSize += SizeEstimator.estimate(row) + } + arrowWriter.finish() + val batch = unloader.getRecordBatch() + + MessageSerializer.serialize(writeChannel, arrowSchema) Review Comment: sgtm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014279677 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() +var numBatches = 0L + +if (rows.getNumPartitions > 0) { + val batches = rows.mapPartitionsInternal { iter => +ArrowConverters + .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) + } + + val obj = new Object + + val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => iter.toArray + + val resultHandler = (partitionId: Int, taskResult: Array[(Array[Byte], Long, Long)]) => +obj.synchronized { + var batchId = partitionId.toLong << 33 + taskResult.foreach { case (bytes, count, size) => +val response = proto.Response.newBuilder().setClientId(clientId) +val batch = proto.Response.ArrowBatch + .newBuilder() + .setBatchId(batchId) + .setRowCount(count) + .setUncompressedBytes(size) + .setCompressedBytes(bytes.length) + .setData(ByteString.copyFrom(bytes)) + .build() +response.setArrowBatch(batch) +responseObserver.onNext(response.build()) Review Comment: This callback is currently executed by the DAGScheduler thread, and if this expensive no job/stage can be scheduled during this call. We really should move this to a separate thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014279677 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() +var numBatches = 0L + +if (rows.getNumPartitions > 0) { + val batches = rows.mapPartitionsInternal { iter => +ArrowConverters + .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) + } + + val obj = new Object + + val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => iter.toArray + + val resultHandler = (partitionId: Int, taskResult: Array[(Array[Byte], Long, Long)]) => +obj.synchronized { + var batchId = partitionId.toLong << 33 + taskResult.foreach { case (bytes, count, size) => +val response = proto.Response.newBuilder().setClientId(clientId) +val batch = proto.Response.ArrowBatch + .newBuilder() + .setBatchId(batchId) + .setRowCount(count) + .setUncompressedBytes(size) + .setCompressedBytes(bytes.length) + .setData(ByteString.copyFrom(bytes)) + .build() +response.setArrowBatch(batch) +responseObserver.onNext(response.build()) Review Comment: This callback is currently executed by the DAGScheduler thread, and if this expensive no job/stage can be scheduler during this call. We really should move this to a separate thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014270879 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() Review Comment: You need to wrap this and all the code below in SQLExecution.withExecutionId(..) or use Dataset.withAction otherwise you break the UI and a bunch of other things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014270879 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() Review Comment: You need to wrap this in SQLExecution.withExecutionId(..) or use Dataset.withAction otherwise you break the UI and a bunch of other things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect
hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014269647 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } -responseObserver.onNext(sendMetricsToResponse(clientId, rows)) +responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) +responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { +val spark = dataframe.sparkSession +val schema = dataframe.schema +// TODO: control the batch size instead of max records +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch +val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + +val rows = dataframe.queryExecution.executedPlan.execute() +var numBatches = 0L + +if (rows.getNumPartitions > 0) { + val batches = rows.mapPartitionsInternal { iter => +ArrowConverters + .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) + } + + val obj = new Object + + val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => iter.toArray Review Comment: This breaks sorted results. A higher partition can complete earlier than lower ones thus breaking the order. That is why I the snippet I posted buffered the partitions in the handler, while the main thread scanned over them 1 by 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SandishKumarHN commented on a diff in pull request #37972: [SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf
SandishKumarHN commented on code in PR #37972: URL: https://github.com/apache/spark/pull/37972#discussion_r1014260137 ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.protobuf + +import com.google.protobuf.{ByteString, DynamicMessage, Message} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{RandomDataGenerator, Row} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, NoopFilters, OrderedFilters, StructFilters} +import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.protobuf.utils.{ProtobufUtils, SchemaConverters} +import org.apache.spark.sql.sources.{EqualTo, Not} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class ProtobufCatalystDataConversionSuite +extends SparkFunSuite +with SharedSparkSession +with ExpressionEvalHelper { + + private def checkResult( + data: Literal, + descFilePath: String, + messageName: String, + expected: Any): Unit = { +checkEvaluation( + ProtobufDataToCatalyst( +CatalystDataToProtobuf(data, descFilePath, messageName), +descFilePath, +messageName, +Map.empty), + prepareExpectedResult(expected)) + } + + protected def checkUnsupportedRead( + data: Literal, + descFilePath: String, + actualSchema: String, + badSchema: String): Unit = { + +val binary = CatalystDataToProtobuf(data, descFilePath, actualSchema) + +intercept[Exception] { + ProtobufDataToCatalyst(binary, descFilePath, badSchema, Map("mode" -> "FAILFAST")).eval() +} + +val expected = { + val expectedSchema = ProtobufUtils.buildDescriptor(descFilePath, badSchema) + SchemaConverters.toSqlType(expectedSchema).dataType match { +case st: StructType => + Row.fromSeq((0 until st.length).map { _ => +null + }) +case _ => null + } +} + +checkEvaluation( + ProtobufDataToCatalyst(binary, descFilePath, badSchema, Map("mode" -> "PERMISSIVE")), + expected) + } + + protected def prepareExpectedResult(expected: Any): Any = expected match { +// Spark byte and short both map to Protobuf int +case b: Byte => b.toInt +case s: Short => s.toInt +case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult)) +case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult)) +case map: MapData => + val keys = new GenericArrayData( + map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + val values = new GenericArrayData( + map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + new ArrayBasedMapData(keys, values) +case other => other + } + + private val testingTypes = Seq( +StructType(StructField("int32_type", IntegerType, nullable = true) :: Nil), +StructType(StructField("double_type", DoubleType, nullable = true) :: Nil), +StructType(StructField("float_type", FloatType, nullable = true) :: Nil), +StructType(StructField("bytes_type", BinaryType, nullable = true) :: Nil), +StructType(StructField("string_type", StringType, nullable = true) :: Nil)) + + private val catalystTypesToProtoMessages: Map[DataType, String] = Map( +IntegerType -> "IntegerMsg", +DoubleType -> "DoubleMsg", +FloatType -> "FloatMsg", +BinaryType -> "BytesMsg", +StringType -> "StringMsg") + + testingTypes.foreach { dt => +val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1) Review Comment: @MaxGekk thank you will look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [spark] swamirishi commented on pull request #38377: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
swamirishi commented on PR #38377: URL: https://github.com/apache/spark/pull/38377#issuecomment-1303865771 > Two points: > > * spark.driver.log.dfsDir is typically expected to be a path to hdfs - so resolving it relative to current working directory does not make sense > * If `rootDir` is referencing local filesystem, it will get resolved relative to local fs by the `fileSystem` for `rootDir` This can be done using makeQualified API instead of resolvePath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] FouadApp closed pull request #38512: WIP: [SPARK-38564] Support read hive table from subdirectory source
FouadApp closed pull request #38512: WIP: [SPARK-38564] Support read hive table from subdirectory source URL: https://github.com/apache/spark/pull/38512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] FouadApp opened a new pull request, #38512: WIP: [SPARK-38564] Support read hive table from subdirectory source
FouadApp opened a new pull request, #38512: URL: https://github.com/apache/spark/pull/38512 ### What changes were proposed in this pull request? This support could read source files of partitioned hive table with subdirectories. ### Why are the changes needed? While use spark engine to read a partititioned hive table with subdirectories, cannot read the source files in subdirectories . ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes
LuciferYang commented on PR #38498: URL: https://github.com/apache/spark/pull/38498#issuecomment-1303797442 GA passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] FouadApp commented on pull request #32679: [SPARK-28098][SQL]Support read hive table while LeafDir had multi-level paths
FouadApp commented on PR #32679: URL: https://github.com/apache/spark/pull/32679#issuecomment-1303780636 I have the same problem: With the TEZ engine writing data in the presence of union all: part_date=/HIVE_UNION_SUBDIR_1/part_000 (parquet) part_date=/HIVE_UNION_SUBDIR_2 part_date=/HIVE_UNION_SUBDIR_x when I run a query on this data df = spark.sql("select * from table") df.count() ---> 0 Spark cannot read the subdir ! I have a solution biut is not recommmended spark.conf.set("mapred.input.dir.recursive", "true") spark.conf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true") spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false") # This param is not recommended in Spark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] FouadApp commented on pull request #32679: [SPARK-28098][SQL]Support read hive table while LeafDir had multi-level paths
FouadApp commented on PR #32679: URL: https://github.com/apache/spark/pull/32679#issuecomment-1303772792 > Any chance of this getting picked up again? I saw it was merged in a fork: [lyft#40](https://github.com/lyft/spark/pull/40) but it would be great to have it upstream but, it's not on the official repo (apache/spark) ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan opened a new pull request, #38511: [WIP][SPARK-41017][SQL] Do not push Filter through reference-only Project
cloud-fan opened a new pull request, #38511: URL: https://github.com/apache/spark/pull/38511 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #38462: [SPARK-40533] [CONNECT] [PYTHON] Support most built-in literal types for Python in Spark Connect
HyukjinKwon closed pull request #38462: [SPARK-40533] [CONNECT] [PYTHON] Support most built-in literal types for Python in Spark Connect URL: https://github.com/apache/spark/pull/38462 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #38462: [SPARK-40533] [CONNECT] [PYTHON] Support most built-in literal types for Python in Spark Connect
HyukjinKwon commented on PR #38462: URL: https://github.com/apache/spark/pull/38462#issuecomment-1303686102 Merged to master. Let's address complete types in a followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #38485: [SPARK-41001] [CONNECT] [PYTHON] Implementing Connection String for Python Client
HyukjinKwon closed pull request #38485: [SPARK-41001] [CONNECT] [PYTHON] Implementing Connection String for Python Client URL: https://github.com/apache/spark/pull/38485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dwsmith1983 commented on pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
dwsmith1983 commented on PR #38510: URL: https://github.com/apache/spark/pull/38510#issuecomment-1303640935 @itholic I was going over another topic and made some updates on sql performance tuning as well. I added a screenshot of the markdown. This how you want it correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dwsmith1983 opened a new pull request, #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar
dwsmith1983 opened a new pull request, #38510: URL: https://github.com/apache/spark/pull/38510 ### What changes were proposed in this pull request? I made some small grammar fixes related to dependent clause followed but independent clauses, starting a sentence with an introductory phrase, using the plural with when are is present in the sentence, and other small fixes to improve readability. https://spark.apache.org/docs/latest/sql-performance-tuning.html https://user-images.githubusercontent.com/7563201/18862-d9418bc1-2fcd-4eff-be8e-af412add6946.png;> https://user-images.githubusercontent.com/7563201/18871-b5629ec6-9a9a-4f5a-96ce-0f90bd3e97b3.png;> https://user-images.githubusercontent.com/7563201/18877-c7f1ac95-618d-4c6d-a5ed-f84e927bb5b9.png;> ### Why are the changes needed? These changes improve the readability of the Spark documentation for new users or those studying up. ### Does this PR introduce _any_ user-facing change? Yes, these changes impact the spark documentation. ### How was this patch tested? No test were created as these changes were solely in markdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srielau commented on a diff in pull request #38490: [SPARK-41009][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1070` to `LOCATION_ALREADY_EXISTS`
srielau commented on code in PR #38490: URL: https://github.com/apache/spark/pull/38490#discussion_r1014078710 ## core/src/main/resources/error/error-classes.json: ## @@ -668,6 +668,24 @@ } } }, + "LOCATION_ALREADY_EXISTS" : { +"message" : [ + "Cannot create the location because it already exists.", + "Choose a different path or remove the existing location." +], +"subClass" : { Review Comment: Is this level of differentiation necessary? Aside the sub-error class text partially repeats what the general text says. To make this message useful I think we should explain how/that the the table's directory name (?) is derived from the table name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srielau commented on a diff in pull request #38490: [SPARK-41009][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1070` to `LOCATION_ALREADY_EXISTS`
srielau commented on code in PR #38490: URL: https://github.com/apache/spark/pull/38490#discussion_r1014078710 ## core/src/main/resources/error/error-classes.json: ## @@ -668,6 +668,24 @@ } } }, + "LOCATION_ALREADY_EXISTS" : { +"message" : [ + "Cannot create the location because it already exists.", + "Choose a different path or remove the existing location." +], +"subClass" : { Review Comment: Is this level of differentiation necessary? Aside the sub-error class partially repeats what the general test says. To make this message useful I think we should explain how/that the the table's directory name (?) is derived from the table name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
WeichenXu123 commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1013973661 ## python/pyspark/ml/functions.py: ## @@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( +data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if isinstance(data, pd.DataFrame): +index = 0 +data_size = len(data) +while index < data_size: +yield data.iloc[index : index + batch_size] +index += batch_size +else: +# convert (tuple of) pd.Series into pd.DataFrame +if isinstance(data, pd.Series): +df = pd.concat((data,), axis=1) +else: # isinstance(data, Tuple[pd.Series]): +df = pd.concat(data, axis=1) + +index = 0 +data_size = len(df) +while index < data_size: +yield df.iloc[index : index + batch_size] +index += batch_size + + +def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool: +if isinstance(data, pd.Series): +return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list)) +elif isinstance(data, pd.DataFrame): +return any(data.dtypes == np.object_) and any( +[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]] +) +else: +raise ValueError( +"Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data)) +) + + +def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> bool: +"""Check if input Series/DataFrame/Tuple contains any tensor-valued columns.""" +if isinstance(data, (pd.Series, pd.DataFrame)): +return _is_tensor_col(data) +else: # isinstance(data, Tuple): +return any(_is_tensor_col(elem) for elem in data) + + +def _validate( +preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]], +num_input_rows: int, +return_type: DataType, +) -> None: +"""Validate model predictions against the expected pandas_udf return_type.""" +if isinstance(return_type, StructType): +struct_rtype: StructType = return_type +fieldNames = struct_rtype.names +if isinstance(preds, dict): +# dictionary of columns +predNames = list(preds.keys()) +if not all(v.shape == (num_input_rows,) for v in preds.values()): +raise ValueError("Prediction results for StructType fields must be scalars.") +elif isinstance(preds, list) and isinstance(preds[0], dict): +# rows of dictionaries +predNames = list(preds[0].keys()) +if len(preds) != num_input_rows: +raise ValueError("Prediction results must have same length as input data.") +else: +raise ValueError( +"Prediction results for StructType must be a dictionary or " +"a list of dictionary, got: {}".format(type(preds)) +) + +# check column names +if len(predNames) != len(fieldNames) or not all( +[predNames[i] == fieldNames[i] for i in range(len(fieldNames))] +): +raise ValueError( +"Prediction result columns did not match expected return_type " +"columns: expected {}, got: {}".format(fieldNames, predNames) +) +elif isinstance(return_type, ArrayType): +if isinstance(preds, np.ndarray): +if len(preds) != num_input_rows: +raise ValueError("Prediction results must have same length as input data.") +else: +raise ValueError("Prediction results for ArrayType must be an ndarray.") +else: +if len(preds) != num_input_rows: +raise ValueError("Prediction results must have same length as input data.") + + +def predict_batch_udf( +predict_batch_fn: Callable[ +[], +Callable[ +[np.ndarray | List[np.ndarray]], +np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, np.dtype]], +], +], +*, +return_type: DataType, +batch_size: int, +input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | None = None, +) -> UserDefinedFunctionLike: +"""Given a function which loads a model, returns a pandas_udf for inferencing over that model. + +This will handle: +- conversion of the Spark DataFrame to numpy arrays. +- batching of the inputs sent to the model predict() function. +- caching of the model and prediction function on the executors. + +This assumes that the `predict_batch_fn` encapsulates all of the necessary dependencies for +running the model or the Spark executor environment already
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
WeichenXu123 commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1013972337 ## python/pyspark/ml/functions.py: ## @@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( +data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if isinstance(data, pd.DataFrame): +index = 0 +data_size = len(data) +while index < data_size: +yield data.iloc[index : index + batch_size] +index += batch_size +else: +# convert (tuple of) pd.Series into pd.DataFrame +if isinstance(data, pd.Series): +df = pd.concat((data,), axis=1) +else: # isinstance(data, Tuple[pd.Series]): +df = pd.concat(data, axis=1) + +index = 0 +data_size = len(df) +while index < data_size: +yield df.iloc[index : index + batch_size] +index += batch_size + + +def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool: +if isinstance(data, pd.Series): +return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list)) +elif isinstance(data, pd.DataFrame): +return any(data.dtypes == np.object_) and any( +[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]] +) +else: +raise ValueError( +"Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data)) +) + + +def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> bool: +"""Check if input Series/DataFrame/Tuple contains any tensor-valued columns.""" +if isinstance(data, (pd.Series, pd.DataFrame)): +return _is_tensor_col(data) +else: # isinstance(data, Tuple): +return any(_is_tensor_col(elem) for elem in data) + + +def _validate( +preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]], +num_input_rows: int, +return_type: DataType, +) -> None: +"""Validate model predictions against the expected pandas_udf return_type.""" +if isinstance(return_type, StructType): +struct_rtype: StructType = return_type +fieldNames = struct_rtype.names +if isinstance(preds, dict): +# dictionary of columns +predNames = list(preds.keys()) +if not all(v.shape == (num_input_rows,) for v in preds.values()): Review Comment: I suggest we support the case: StructType containing field of ArrayType, this is a common case. Correspondingly, if we support this, let's add similar checking like https://github.com/apache/spark/pull/37734/files#r1013971238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
WeichenXu123 commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1013971238 ## python/pyspark/ml/functions.py: ## @@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( +data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if isinstance(data, pd.DataFrame): +index = 0 +data_size = len(data) +while index < data_size: +yield data.iloc[index : index + batch_size] +index += batch_size +else: +# convert (tuple of) pd.Series into pd.DataFrame +if isinstance(data, pd.Series): +df = pd.concat((data,), axis=1) +else: # isinstance(data, Tuple[pd.Series]): +df = pd.concat(data, axis=1) + +index = 0 +data_size = len(df) +while index < data_size: +yield df.iloc[index : index + batch_size] +index += batch_size + + +def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool: +if isinstance(data, pd.Series): +return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list)) +elif isinstance(data, pd.DataFrame): +return any(data.dtypes == np.object_) and any( +[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]] +) +else: +raise ValueError( +"Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data)) +) + + +def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> bool: +"""Check if input Series/DataFrame/Tuple contains any tensor-valued columns.""" +if isinstance(data, (pd.Series, pd.DataFrame)): +return _is_tensor_col(data) +else: # isinstance(data, Tuple): +return any(_is_tensor_col(elem) for elem in data) + + +def _validate( +preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]], +num_input_rows: int, +return_type: DataType, +) -> None: +"""Validate model predictions against the expected pandas_udf return_type.""" +if isinstance(return_type, StructType): +struct_rtype: StructType = return_type +fieldNames = struct_rtype.names +if isinstance(preds, dict): +# dictionary of columns +predNames = list(preds.keys()) +if not all(v.shape == (num_input_rows,) for v in preds.values()): +raise ValueError("Prediction results for StructType fields must be scalars.") +elif isinstance(preds, list) and isinstance(preds[0], dict): +# rows of dictionaries +predNames = list(preds[0].keys()) +if len(preds) != num_input_rows: +raise ValueError("Prediction results must have same length as input data.") +else: +raise ValueError( +"Prediction results for StructType must be a dictionary or " +"a list of dictionary, got: {}".format(type(preds)) +) + +# check column names +if len(predNames) != len(fieldNames) or not all( +[predNames[i] == fieldNames[i] for i in range(len(fieldNames))] +): +raise ValueError( +"Prediction result columns did not match expected return_type " +"columns: expected {}, got: {}".format(fieldNames, predNames) +) +elif isinstance(return_type, ArrayType): +if isinstance(preds, np.ndarray): +if len(preds) != num_input_rows: Review Comment: We need to add an additional checking: `len(preds.shape)` == 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
gaoyajun02 commented on code in PR #38333: URL: https://github.com/apache/spark/pull/38333#discussion_r1013969510 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator( // since the last call. val msg = s"Received a zero-size buffer for block $blockId from $address " + s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)" -throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) +if (blockId.isShuffleChunk) { + logWarning(msg) + pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) Review Comment: ok, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
WeichenXu123 commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1013960675 ## python/pyspark/ml/functions.py: ## @@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( +data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if isinstance(data, pd.DataFrame): +index = 0 +data_size = len(data) +while index < data_size: +yield data.iloc[index : index + batch_size] +index += batch_size +else: +# convert (tuple of) pd.Series into pd.DataFrame +if isinstance(data, pd.Series): +df = pd.concat((data,), axis=1) +else: # isinstance(data, Tuple[pd.Series]): +df = pd.concat(data, axis=1) + +index = 0 +data_size = len(df) +while index < data_size: +yield df.iloc[index : index + batch_size] +index += batch_size + + +def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool: +if isinstance(data, pd.Series): +return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list)) +elif isinstance(data, pd.DataFrame): +return any(data.dtypes == np.object_) and any( +[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]] +) +else: +raise ValueError( +"Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data)) +) + + +def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> bool: +"""Check if input Series/DataFrame/Tuple contains any tensor-valued columns.""" +if isinstance(data, (pd.Series, pd.DataFrame)): +return _is_tensor_col(data) +else: # isinstance(data, Tuple): +return any(_is_tensor_col(elem) for elem in data) + + +def _validate( +preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]], +num_input_rows: int, +return_type: DataType, +) -> None: +"""Validate model predictions against the expected pandas_udf return_type.""" +if isinstance(return_type, StructType): +struct_rtype: StructType = return_type +fieldNames = struct_rtype.names +if isinstance(preds, dict): +# dictionary of columns +predNames = list(preds.keys()) +if not all(v.shape == (num_input_rows,) for v in preds.values()): +raise ValueError("Prediction results for StructType fields must be scalars.") +elif isinstance(preds, list) and isinstance(preds[0], dict): +# rows of dictionaries +predNames = list(preds[0].keys()) +if len(preds) != num_input_rows: +raise ValueError("Prediction results must have same length as input data.") +else: +raise ValueError( +"Prediction results for StructType must be a dictionary or " +"a list of dictionary, got: {}".format(type(preds)) +) + +# check column names +if len(predNames) != len(fieldNames) or not all( +[predNames[i] == fieldNames[i] for i in range(len(fieldNames))] +): Review Comment: The checking here force the returned dict to have the same key order with the order in struct type field list. I think it is not necessary (note some case the dict key order is undefined), we can simply check `set(predNames) ==set(fieldNames)` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org