[GitHub] [spark] jerrypeng opened a new pull request, #38517: [WIP][SPARK-39591][SS] Async Progress Tracking

2022-11-04 Thread GitBox


jerrypeng opened a new pull request, #38517:
URL: https://github.com/apache/spark/pull/38517

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on pull request #38495: [SPARK-35531][SQL] Update hive table stats without unnecessary convert

2022-11-04 Thread GitBox


wankunde commented on PR #38495:
URL: https://github.com/apache/spark/pull/38495#issuecomment-1304400795

   @cloud-fan @AngersZh Could you help to review this PR ? Another PR 
https://github.com/apache/spark/pull/38496 depends on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on pull request #38495: [SPARK-35531][SQL] Update hive table stats without unnecessary convert

2022-11-04 Thread GitBox


wankunde commented on PR #38495:
URL: https://github.com/apache/spark/pull/38495#issuecomment-1304399983

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


zhengruifeng commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014560227


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()
+var numBatches = 0L
+
+if (rows.getNumPartitions > 0) {
+  val batches = rows.mapPartitionsInternal { iter =>
+ArrowConverters
+  .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+  }
+
+  val obj = new Object
+
+  val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => 
iter.toArray

Review Comment:
   with batch_id, we can send higher partition before lower ones



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


zhengruifeng commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014559977


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()

Review Comment:
   good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


zhengruifeng commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014559938


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()
+var numBatches = 0L
+
+if (rows.getNumPartitions > 0) {
+  val batches = rows.mapPartitionsInternal { iter =>
+ArrowConverters
+  .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+  }
+
+  val obj = new Object
+
+  val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => 
iter.toArray
+
+  val resultHandler = (partitionId: Int, taskResult: Array[(Array[Byte], 
Long, Long)]) =>
+obj.synchronized {
+  var batchId = partitionId.toLong << 33
+  taskResult.foreach { case (bytes, count, size) =>
+val response = proto.Response.newBuilder().setClientId(clientId)
+val batch = proto.Response.ArrowBatch
+  .newBuilder()
+  .setBatchId(batchId)
+  .setRowCount(count)
+  .setUncompressedBytes(size)
+  .setCompressedBytes(bytes.length)
+  .setData(ByteString.copyFrom(bytes))
+  .build()
+response.setArrowBatch(batch)
+responseObserver.onNext(response.build())

Review Comment:
   ok will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ljfgem commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


ljfgem commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014550696


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {

Review Comment:
   I think they belong to `ViewCatalog` since there are similar properties in 
[TableCatalog](https://github.com/apache/spark/blob/5a71a7f7b5c1762677ddbfe39a7c35d645c25e94/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java#L48)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014549689


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {

Review Comment:
   I left them out as they felt more "implementation" then "API". Do they 
belong to ViewCatalog?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] attilapiros commented on pull request #38516: [SPARK-32380][SQL] Fixing access of HBase table via Hive

2022-11-04 Thread GitBox


attilapiros commented on PR #38516:
URL: https://github.com/apache/spark/pull/38516#issuecomment-1304373701

   cc @dongjoon-hyun, @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] attilapiros opened a new pull request, #38516: Initial version

2022-11-04 Thread GitBox


attilapiros opened a new pull request, #38516:
URL: https://github.com/apache/spark/pull/38516

   ### What changes were proposed in this pull request?
   
   This is an update of https://github.com/apache/spark/pull/29178 which was 
closed because the root cause of the error was just vaguely defined there but 
here I will give an explanation why `HiveHBaseTableInputFormat` does not work 
well with the `NewHadoopRDD` (see in the next section). 
   
   The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format 
is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'. 
   
   - environments (Cloudera distribution 7.1.7.SP1):
   hadoop 3.1.1 
   hive 3.1.300
   spark 3.2.1 
   hbase 2.2.3
   
   ### Why are the changes needed?
   
   With the `NewHadoopRDD` the following exception is raised:
   
   ```
   java.io.IOException: Cannot create a record reader because of a previous 
error. Please look at the previous logs lines from the task's full log for more 
details.
 at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
 at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
 at scala.Option.getOrElse(Option.scala:189)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
 at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
 at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
 at scala.Option.getOrElse(Option.scala:189)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
 at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
 at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
 at scala.Option.getOrElse(Option.scala:189)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
 at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
 at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
 at scala.Option.getOrElse(Option.scala:189)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
 at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
 at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
 at scala.Option.getOrElse(Option.scala:189)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
 at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
 at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
 at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
 at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
 at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
 at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
 at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
 at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
 at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
 at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
 ... 47 elided
   Caused by: java.lang.IllegalStateException: The input format instance has 
not been properly initialized. Ensure you call initializeTable either in your 
constructor or initialize method
 at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
 at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
 ... 86 more
   ```
   
   
   ### Short summary of the reason
   
   There are two interfaces:
   
   - the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg 
method `getSplits(JobContext context)` (returning `List`)
   - the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method 
`getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)
   
   And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the 
old method leads to required initialisation and this why `NewHadoopRDD` fails 
here.
   
   ### Details
   
   Here all the link refers to master branches latest commits of components to 
get the right line 

[GitHub] [spark] ljfgem commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


ljfgem commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014536719


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {

Review Comment:
   Why do we remove the following preserved properties from the [umbrella 
PR](https://github.com/apache/spark/pull/35636/files#diff-7094226eb616c14235b9a88fb6d8bc4bef39fa4f9879aa8dc4cfe4d031b720e7)?
   ```
 /**
  * A reserved property to specify the description of the view.
  */
 String PROP_COMMENT = "comment";
   
 /**
  * A reserved property to specify the owner of the view.
  */
 String PROP_OWNER = "owner";
   
 /**
  * A reserved property to specify the software version used to create the 
view.
  */
 String PROP_CREATE_ENGINE_VERSION = "create_engine_version";
   
 /**
  * A reserved property to specify the software version used to change the 
view.
  */
 String PROP_ENGINE_VERSION = "engine_version";
   
 /**
  * All reserved properties of the view.
  */
 List RESERVED_PROPERTIES = Arrays.asList(
   PROP_COMMENT,
   PROP_OWNER,
   PROP_CREATE_ENGINE_VERSION,
   PROP_ENGINE_VERSION);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #34637: [SPARK-37349][SQL] add SQL Rest API parsing logic

2022-11-04 Thread GitBox


github-actions[bot] commented on PR #34637:
URL: https://github.com/apache/spark/pull/34637#issuecomment-1304354583

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #37083: [SPARK-39678][SQL] Improve stats estimation for v2 tables

2022-11-04 Thread GitBox


github-actions[bot] closed pull request #37083: [SPARK-39678][SQL] Improve 
stats estimation for v2 tables
URL: https://github.com/apache/spark/pull/37083


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #37226: [MINOR][SQL] Simplify the description of built-in function.

2022-11-04 Thread GitBox


github-actions[bot] closed pull request #37226: [MINOR][SQL] Simplify the 
description of built-in function.
URL: https://github.com/apache/spark/pull/37226


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #37009: [SPARK-38292][PYTHON]Support na_filter for pyspark.pandas.read_csv

2022-11-04 Thread GitBox


github-actions[bot] commented on PR #37009:
URL: https://github.com/apache/spark/pull/37009#issuecomment-1304354576

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #37239: [SPARK-39825][SQL] Fix PushDownLeftSemiAntiJoin push through project

2022-11-04 Thread GitBox


github-actions[bot] closed pull request #37239: [SPARK-39825][SQL] Fix 
PushDownLeftSemiAntiJoin push through project
URL: https://github.com/apache/spark/pull/37239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #37104: [SPARK-39698][SQL] Use `TakeOrderedAndProject` if maxRows below the `spark.sql.execution.topKSortMaxRowsThreshold`

2022-11-04 Thread GitBox


github-actions[bot] commented on PR #37104:
URL: https://github.com/apache/spark/pull/37104#issuecomment-1304354561

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #37309: [SPARK-39871][CORE] Jmx http interface supported for SparkHistoryServer

2022-11-04 Thread GitBox


github-actions[bot] commented on PR #37309:
URL: https://github.com/apache/spark/pull/37309#issuecomment-1304354542

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #37315: [SPARK-39892][SQL] Use ArrowType.Decimal(precision, scale, bitWidth) instead of ArrowType.Decimal(precision, scale)

2022-11-04 Thread GitBox


github-actions[bot] commented on PR #37315:
URL: https://github.com/apache/spark/pull/37315#issuecomment-1304354536

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014532124


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * List the views in a namespace from the catalog.
+   * 
+   * If the catalog supports tables, this must return identifiers for only 
views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * 
+   * If the catalog supports tables and contains a table for the identifier 
and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * 
+   * If the view is already loaded or cached, drop cached data. If the view 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from 
the catalog.
+   * 
+   * If the catalog supports views and contains a view for the identifier and 
not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+try {
+  return loadView(ident) != null;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View createView(
+  Identifier ident,
+  String sql,
+  String currentCatalog,
+  String[] currentNamespace,
+  StructType schema,
+  String[] columnAliases,

Review Comment:
   How about builder?
   ```
   public interface ViewBuilder {
   
 ViewBuilder withQuery(String query);
 ViewBuilder withCurrentCatalog(String defaultCatalog);
 ViewBuilder withCurrentNamespace(String[] defaultNamespaces);
 ViewBuilder withSchema(StructType schema);
 ViewBuilder withQueryColumnNames(String[] queryColumnNames);
 ViewBuilder withColumnAliases(String[] columnAliases);
 ViewBuilder withColumnComments(String[] columnComments);
 ViewBuilder withProperties(Map properties);
 ViewBuilder withProperty(String key, String value);
 View create();
 View replace();
 View createOrReplace();
   }
   
   ViewCatalog {
 ViewBuilder buildView(Identifier ident);
   }
   ```



##

[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014532124


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * List the views in a namespace from the catalog.
+   * 
+   * If the catalog supports tables, this must return identifiers for only 
views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * 
+   * If the catalog supports tables and contains a table for the identifier 
and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * 
+   * If the view is already loaded or cached, drop cached data. If the view 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from 
the catalog.
+   * 
+   * If the catalog supports views and contains a view for the identifier and 
not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+try {
+  return loadView(ident) != null;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View createView(
+  Identifier ident,
+  String sql,
+  String currentCatalog,
+  String[] currentNamespace,
+  StructType schema,
+  String[] columnAliases,

Review Comment:
   How about builder?
   ```
   public interface ViewBuilder {
   
 ViewBuilder withQuery(String query);
 ViewBuilder withCurrentCatalog(String defaultCatalog);
 ViewBuilder withCurrentNamespace(String[] defaultNamespaces);
 ViewBuilder withSchema(StructType schema);
 ViewBuilder withQueryColumnNames(String[] queryColumnNames);
 ViewBuilder withColumnAliases(String[] columnAliases);
 ViewBuilder withColumnComments(String[] columnComments);
 ViewBuilder withProperties(Map properties);
 ViewBuilder withProperty(String key, String value);
 View create();
 View replace();
 View createOrReplace();
   }
   
   ViewCatalog {
 ViewBuilder buildView(Identifier ident);
   }
   ```



-- 
This is an automated message from the Apache Git 

[GitHub] [spark] SandishKumarHN commented on pull request #38515: [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator

2022-11-04 Thread GitBox


SandishKumarHN commented on PR #38515:
URL: https://github.com/apache/spark/pull/38515#issuecomment-1304351416

   @rangadi Because some random numbers do not convert to catalyst type, a null 
check for the data generator is required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SandishKumarHN opened a new pull request, #38515: [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator

2022-11-04 Thread GitBox


SandishKumarHN opened a new pull request, #38515:
URL: https://github.com/apache/spark/pull/38515

   
   ### What changes were proposed in this pull request?
   null check for data generator after type conversion
   
   NA
   ### Why are the changes needed?
   NA
   
   
   ### Does this PR introduce _any_ user-facing change?
   NA
   
   ### How was this patch tested?
   I have tested all the random manually
   current unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014530432


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * List the views in a namespace from the catalog.
+   * 
+   * If the catalog supports tables, this must return identifiers for only 
views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * 
+   * If the catalog supports tables and contains a table for the identifier 
and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * 
+   * If the view is already loaded or cached, drop cached data. If the view 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from 
the catalog.
+   * 
+   * If the catalog supports views and contains a view for the identifier and 
not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+try {
+  return loadView(ident) != null;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View createView(
+  Identifier ident,
+  String sql,
+  String currentCatalog,
+  String[] currentNamespace,
+  StructType schema,
+  String[] columnAliases,

Review Comment:
   I'd hesitate to do so because these have different meanings: one is more 
like "CreateViewRequest", the other like "ViewMetadata". Using the same 
interface may cause confusion and evolving one of them in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


xkrogen commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014526120


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * List the views in a namespace from the catalog.
+   * 
+   * If the catalog supports tables, this must return identifiers for only 
views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * 
+   * If the catalog supports tables and contains a table for the identifier 
and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * 
+   * If the view is already loaded or cached, drop cached data. If the view 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from 
the catalog.
+   * 
+   * If the catalog supports views and contains a view for the identifier and 
not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+try {
+  return loadView(ident) != null;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View createView(
+  Identifier ident,
+  String sql,
+  String currentCatalog,
+  String[] currentNamespace,
+  StructType schema,
+  String[] columnAliases,

Review Comment:
   Actually, should the argument to this just be a `View`...? We are basically 
passing all of the same information here. `View` doesn't contain any "derived" 
fields you can only learn after creating the view. Similar to `ViewChange`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


srowen commented on code in PR #38510:
URL: https://github.com/apache/spark/pull/38510#discussion_r1014525886


##
docs/sql-performance-tuning.md:
##
@@ -77,8 +77,8 @@ that these options will be deprecated in future release as 
more optimizations ar
 spark.sql.files.openCostInBytes
 4194304 (4 MB)
 
-  The estimated cost to open a file, measured by the number of bytes could 
be scanned in the same
-  time. This is used when putting multiple files into a partition. It is 
better to over-estimated,
+  The estimated cost to open a file, measured by the number of bytes could 
that be scanned in the same

Review Comment:
   One more - this should be "that could be"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


xkrogen commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014524565


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   LGTM. We are capturing some redundant information but I recognize the value 
in trying to capture everything from the `CREATE VIEW` statement to make things 
clear and not rely on derived attributes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


xkrogen commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014524565


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   LGTM. We are capturing some redundant information but I recognize the value 
in trying to persist everything from the `CREATE VIEW` statement to make things 
clear and not rely on derived attributes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] swamirishi commented on a diff in pull request #38377: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

2022-11-04 Thread GitBox


swamirishi commented on code in PR #38377:
URL: https://github.com/apache/spark/pull/38377#discussion_r1014524002


##
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala:
##
@@ -142,7 +142,7 @@ private[spark] class DriverLogger(conf: SparkConf) extends 
Logging {
   threadpool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread")
   threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, 
UPLOAD_INTERVAL_IN_SECS,
 TimeUnit.SECONDS)
-  logInfo(s"Started driver log file sync to: ${dfsLogFile}")
+  logInfo(s"Started driver log file sync to: ${dfsLogFile.toString}")

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014523963


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * List the views in a namespace from the catalog.
+   * 
+   * If the catalog supports tables, this must return identifiers for only 
views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * 
+   * If the catalog supports tables and contains a table for the identifier 
and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * 
+   * If the view is already loaded or cached, drop cached data. If the view 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from 
the catalog.
+   * 
+   * If the catalog supports views and contains a view for the identifier and 
not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+try {
+  return loadView(ident) != null;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View createView(
+  Identifier ident,
+  String sql,
+  String currentCatalog,
+  String[] currentNamespace,
+  StructType schema,
+  String[] columnAliases,

Review Comment:
   Should we create a Builder?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014522636


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   Pushed a few changes:
   - Updated Javadoc for View.schema to indicate aliases applied
   - Added `View.queryColumnNames` to store output column names of the query 
that creates this view
   - Renamed `View.sql` to `View.query` because the term "query" is more 
accurate to describe the SELECT query that creates the view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014522636


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   Pushed a few changes:
   - Updated Javadoc for View.schema to indicate aliases applied
   - Added `View.queryColumnNames` to store output column names
   - Renamed `View.sql` to `View.query` because "query" is more accurate to 
describe the SELECT query for the view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014522636


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   Pushed a few changes:
   - Updated Javadoc for View.schema to indicate aliases applied
   - Added `View.queryColumnNames` to store output column names, useful for 
SELECT star queries
   - Renamed `View.sql` to `View.query` because "query" is more accurate to 
describe the SELECT query for the view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


xkrogen commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014522309


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Catalog methods for working with views.
+ */
+@DeveloperApi
+public interface ViewCatalog extends CatalogPlugin {
+
+  /**
+   * List the views in a namespace from the catalog.
+   * 
+   * If the catalog supports tables, this must return identifiers for only 
views and not tables.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for views
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listViews(String... namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load view metadata by {@link Identifier ident} from the catalog.
+   * 
+   * If the catalog supports tables and contains a table for the identifier 
and not a view,
+   * this must throw {@link NoSuchViewException}.
+   *
+   * @param ident a view identifier
+   * @return the view description
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   */
+  View loadView(Identifier ident) throws NoSuchViewException;
+
+  /**
+   * Invalidate cached view metadata for an {@link Identifier identifier}.
+   * 
+   * If the view is already loaded or cached, drop cached data. If the view 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a view identifier
+   */
+  default void invalidateView(Identifier ident) {
+  }
+
+  /**
+   * Test whether a view exists using an {@link Identifier identifier} from 
the catalog.
+   * 
+   * If the catalog supports views and contains a view for the identifier and 
not a table,
+   * this must return false.
+   *
+   * @param ident a view identifier
+   * @return true if the view exists, false otherwise
+   */
+  default boolean viewExists(Identifier ident) {
+try {
+  return loadView(ident) != null;
+} catch (NoSuchViewException e) {
+  return false;
+}
+  }
+
+  /**
+   * Create a view in the catalog.
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @return the view created
+   * @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View createView(
+  Identifier ident,
+  String sql,
+  String currentCatalog,
+  String[] currentNamespace,
+  StructType schema,
+  String[] columnAliases,

Review Comment:
   We need `queryColumnNames` here as well right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014515719


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   Yeah, for select star, we need something similar to `viewQueryColumnNames`.
   
   If schema is pre-alias, then `schema.fieldNames` can serve the purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


xkrogen commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014507922


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   If both `columnAliases` and `schema` contain the final output names 
(post-alias), then where are the original column names stored? We need those to 
reconstruct the aliasing similar to the code I linked from `SessionCatalog`. 
Each element of the projection looks like `Alias(UpCast(, 
), )`, which cannot be constructed without access to the 
original column names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jzhuge commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


jzhuge commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014506227


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   No, `schema()` contains the final schema, the same as V1, to reduce 
confusion. I will update javadoc to clarify.
   
   Thank you and @wmoustafa for calling it out!
   
   For V1, schema stores column aliases. But for V2, we decided to use these 
metadata fields to capture the entire CREATE VIEW statement instead of relying 
on `schema` which feels more like "derived":
   - sql
   - columnAliases
   - columnComments
   
   Let me know if this makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37556: [SPARK-39799][SQL] DataSourceV2: View catalog interface

2022-11-04 Thread GitBox


xkrogen commented on code in PR #37556:
URL: https://github.com/apache/spark/pull/37556#discussion_r1014502881


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/View.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a persisted view.
+ */
+@DeveloperApi
+public interface View {
+  /**
+   * A name to identify this view.
+   */
+  String name();
+
+  /**
+   * The view query SQL text.
+   */
+  String sql();
+
+  /**
+   * The current catalog when the view is created.
+   */
+  String currentCatalog();
+
+  /**
+   * The current namespace when the view is created.
+   */
+  String[] currentNamespace();
+
+  /**
+   * The schema for the SQL text when the view is created.
+   */
+  StructType schema();
+
+  /**
+   * The view column aliases.
+   */
+  String[] columnAliases();
+

Review Comment:
   @jzhuge is the intent that `schema()` contains the _pre-alias_ schema and 
`columnAliases()` contains the list of aliases, such that you need to combine 
`schema() + columnAliases()` to get the _post-alias_ schema? This is in 
contrast to how V1 views work, where `CatalogTable.schema()` contains the 
_post-alias_ schema and `viewQueryColumnNames` contains the original column 
names. The plan is constructed by aliasing from the `viewQueryColumnNames` to 
the names stored in `schema` (see [this code in 
SessionCatalog](https://github.com/apache/spark/blob/933dc0c42f0caf74aaa077fd4f2c2e7208452b9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L949-L985)).
   
   I don't have a strong opinion about whether it makes more sense for 
`schema()` to be pre- or post-aliasing, but we need to make this very clear in 
the interface description. If we go with pre-aliasing, we can convert it to 
post-alias inside of 
[V2ViewDescription](https://github.com/apache/spark/pull/28147/files#diff-d0c6f499b30df039d13bf2740c559251ab63ba4cea312e622e23b74fcbb2fcf0)
 to make it match V1 in the analyzer, reducing implementation complexity by 
avoiding having to special-case V1 vs V2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] liuzqt commented on pull request #38064: [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB

2022-11-04 Thread GitBox


liuzqt commented on PR #38064:
URL: https://github.com/apache/spark/pull/38064#issuecomment-1304306588

   @mridulm  I got a error when running that command in my local
   ```
   [error] 
/Users/ziqi.liu/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala:51:
 File line length exceeds 100 characters
   ```
   but it seems irrelevant to this PRand I just merged with upstream master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #38064: [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB

2022-11-04 Thread GitBox


mridulm commented on PR #38064:
URL: https://github.com/apache/spark/pull/38064#issuecomment-1304281819

   Looks like doc build is failing and so failing build ... 
   Can you run `build/sbt -Phadoop-3 -Pyarn -Pdocker-integration-tests 
-Pspark-ganglia-lgpl -Phive -Pmesos -Phive-thriftserver -Pkubernetes 
-Pkinesis-asl -Phadoop-cloud unidoc` against your local repo and see if it 
succeeds ?
   Fix might be a simple case of updating to latest master
   
   (Works fine on local build for me)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


leewyang commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1014476080


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")

Review Comment:
   Yeah, @mengxr had the same question.  Some models, e.g. [Huggingface 
pipeline for sentiment 
analysis](https://huggingface.co/docs/transformers/quicktour#pipeline-usage), 
can produce results in this format, so he agreed to keep this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


leewyang commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1014475066


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size

Review Comment:
   Ended up refactoring the code a bit to simplify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #38506: [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client

2022-11-04 Thread GitBox


AmplabJenkins commented on PR #38506:
URL: https://github.com/apache/spark/pull/38506#issuecomment-1304270266

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #38505: [SPARK-40622][WIP]do not merge(try to fix build error)

2022-11-04 Thread GitBox


AmplabJenkins commented on PR #38505:
URL: https://github.com/apache/spark/pull/38505#issuecomment-1304270304

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #38377: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

2022-11-04 Thread GitBox


mridulm commented on code in PR #38377:
URL: https://github.com/apache/spark/pull/38377#discussion_r1014469439


##
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala:
##
@@ -142,7 +142,7 @@ private[spark] class DriverLogger(conf: SparkConf) extends 
Logging {
   threadpool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread")
   threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, 
UPLOAD_INTERVAL_IN_SECS,
 TimeUnit.SECONDS)
-  logInfo(s"Started driver log file sync to: ${dfsLogFile}")
+  logInfo(s"Started driver log file sync to: ${dfsLogFile.toString}")

Review Comment:
   nit: remove `.toString`



##
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala:
##
@@ -126,13 +126,13 @@ private[spark] class DriverLogger(conf: SparkConf) 
extends Logging {
 throw new RuntimeException(s"${rootDir} does not exist." +
   s" Please create this dir in order to persist driver logs")
   }
-  val dfsLogFile: String = FileUtils.getFile(rootDir, appId
-+ DriverLogger.DRIVER_LOG_FILE_SUFFIX).getAbsolutePath()
+  val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, appId
++ DriverLogger.DRIVER_LOG_FILE_SUFFIX))

Review Comment:
   Looking at it more, given all resolution is related to `fileSystem`, we dont 
need to either resolve or qualify it technically - the only reason to do so is 
for the log message below: so I am fine with this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014428293


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -157,10 +172,11 @@ object UnsupportedOperationChecker extends Logging {
 // Disallow multiple streaming aggregations
 val aggregates = collectStreamingAggregates(plan)
 
-if (aggregates.size > 1) {
+if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
   throwError(
 "Multiple streaming aggregations are not supported with " +

Review Comment:
   and we need to change the message accordingly - 'aggregations' -> 'stateful 
operators'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014425174


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -157,10 +193,11 @@ object UnsupportedOperationChecker extends Logging {
 // Disallow multiple streaming aggregations
 val aggregates = collectStreamingAggregates(plan)
 
-if (aggregates.size > 1) {
+if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {

Review Comment:
   Absolutely. I agree that we should allow multiple stateful ops only in 
append mode. The other modes are not implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dwsmith1983 commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


dwsmith1983 commented on code in PR #38510:
URL: https://github.com/apache/spark/pull/38510#discussion_r1014424662


##
docs/sql-performance-tuning.md:
##
@@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when 
the runtime statistics
spark.sql.adaptive.autoBroadcastJoinThreshold
(none)

- Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1 broadcasting can be disabled. The default value is same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.
+ Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1, broadcasting can be disabled. The default value is the same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.

Review Comment:
   @srowen done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


srowen commented on code in PR #38510:
URL: https://github.com/apache/spark/pull/38510#discussion_r1014421466


##
docs/sql-performance-tuning.md:
##
@@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when 
the runtime statistics
spark.sql.adaptive.autoBroadcastJoinThreshold
(none)

- Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1 broadcasting can be disabled. The default value is same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.
+ Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1, broadcasting can be disabled. The default value is the same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.

Review Comment:
   I'm not suggesting revert; it's a further change. "same with" isn't 
grammatical (either)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dwsmith1983 commented on pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


dwsmith1983 commented on PR #38510:
URL: https://github.com/apache/spark/pull/38510#issuecomment-1304103037

   > OK, any other related files you want to check while your'e here?
   
   I am doing some studying so not sure what other docs I will read and when. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dwsmith1983 commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


dwsmith1983 commented on code in PR #38510:
URL: https://github.com/apache/spark/pull/38510#discussion_r1014419552


##
docs/sql-performance-tuning.md:
##
@@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when 
the runtime statistics
spark.sql.adaptive.autoBroadcastJoinThreshold
(none)

- Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1 broadcasting can be disabled. The default value is same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.
+ Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1, broadcasting can be disabled. The default value is the same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.

Review Comment:
   @srowen thanks for the reply. It just sounded a bit strange. I can convert 
it back if you like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on a diff in pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


srowen commented on code in PR #38510:
URL: https://github.com/apache/spark/pull/38510#discussion_r1014417907


##
docs/sql-performance-tuning.md:
##
@@ -295,7 +294,7 @@ AQE converts sort-merge join to broadcast hash join when 
the runtime statistics
spark.sql.adaptive.autoBroadcastJoinThreshold
(none)

- Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1 broadcasting can be disabled. The default value is same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.
+ Configures the maximum size in bytes for a table that will be 
broadcast to all worker nodes when performing a join. By setting this value to 
-1, broadcasting can be disabled. The default value is the same with 
spark.sql.autoBroadcastJoinThreshold. Note that, this config is 
used only in adaptive framework.

Review Comment:
   the same with -> the same as



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #38427: [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13

2022-11-04 Thread GitBox


mridulm commented on PR #38427:
URL: https://github.com/apache/spark/pull/38427#issuecomment-1304076366

   Merged to master.
   Thanks for working on this @eejbyfeldt !
   Thanks for the reviews @srowen, @dongjoon-hyun, @LuciferYang :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] asfgit closed pull request #38427: [SPARK-40950][CORE] Fix isRemoteAddressMaxedOut performance overhead on scala 2.13

2022-11-04 Thread GitBox


asfgit closed pull request #38427: [SPARK-40950][CORE] Fix 
isRemoteAddressMaxedOut performance overhead on scala 2.13
URL: https://github.com/apache/spark/pull/38427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #38509: [SPARK-41014][PySpark][DOC] Improve documentation and typing of groupby and cogroup applyInPandas

2022-11-04 Thread GitBox


AmplabJenkins commented on PR #38509:
URL: https://github.com/apache/spark/pull/38509#issuecomment-1304060587

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


AmplabJenkins commented on PR #38510:
URL: https://github.com/apache/spark/pull/38510#issuecomment-1304060535

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-04 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1014392620


##
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##
@@ -1780,7 +1802,19 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |  "Remote Bytes Read" : 0,
   |  "Remote Bytes Read To Disk" : 0,
   |  "Local Bytes Read" : 0,
-  |  "Total Records Read" : 0
+  |  "Total Records Read" : 0,
+  |  "Remote Requests Duration": 0,
+  |  "Push Based": {

Review Comment:
   +CC @zhouyejoe 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014391666


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   assertPassOnGlobalWatermarkLimit(
 s"single $joinType join in Append mode",
 streamRelation.join(streamRelation, joinType = RightOuter,
-  condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   Thanks for the check! Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

2022-11-04 Thread GitBox


aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1304020160

   Still remember about following up on this and another PR. Slowly getting 
there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk opened a new pull request, #38514: [WIP][SQL] Provide a query context to `failAnalysis()`

2022-11-04 Thread GitBox


MaxGekk opened a new pull request, #38514:
URL: https://github.com/apache/spark/pull/38514

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #38513: [SPARK-40903][SQL][FOLLOWUP] Cast canonicalized Add as its original data type if necessary

2022-11-04 Thread GitBox


gengliangwang commented on PR #38513:
URL: https://github.com/apache/spark/pull/38513#issuecomment-1304002389

   cc @cloud-fan @srielau @ulysses-you @peter-toth 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang opened a new pull request, #38513: [SPARK-40903][SQL][FOLLOWUP] Cast canonicalized Add as its original data type if necessary

2022-11-04 Thread GitBox


gengliangwang opened a new pull request, #38513:
URL: https://github.com/apache/spark/pull/38513

   
   
   ### What changes were proposed in this pull request?
   
   This is a follow-up of https://github.com/apache/spark/pull/38379. On second 
thought, if the canonicalized `Add` has a different type, casting it as the 
original data type can still match more semantically equivalent `Add`s 
   
   ### Why are the changes needed?
   
   A better solution for the issue 
https://issues.apache.org/jira/browse/SPARK-40903. We can avoid regressions on 
marking on certain semantically equivalent `Add`s as not equivalent.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   New UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #38488: [SPARK-41002][CONNECT][PYTHON] Compatible `take`, `head` and `first` API in Python client

2022-11-04 Thread GitBox


amaliujia commented on PR #38488:
URL: https://github.com/apache/spark/pull/38488#issuecomment-1303988901

   Ok added short description for the new test cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #38506: [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client

2022-11-04 Thread GitBox


amaliujia commented on PR #38506:
URL: https://github.com/apache/spark/pull/38506#issuecomment-1303988661

   Ok added short description for the new test cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang closed pull request #38479: [SPARK-40697][SQL][FOLLOWUP] Read-side char padding should only be applied if necessary

2022-11-04 Thread GitBox


gengliangwang closed pull request #38479: [SPARK-40697][SQL][FOLLOWUP] 
Read-side char padding should only be applied if necessary
URL: https://github.com/apache/spark/pull/38479


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #38479: [SPARK-40697][SQL][FOLLOWUP] Read-side char padding should only be applied if necessary

2022-11-04 Thread GitBox


gengliangwang commented on PR #38479:
URL: https://github.com/apache/spark/pull/38479#issuecomment-1303969098

   Thanks for fixing it. Merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes

2022-11-04 Thread GitBox


MaxGekk closed pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check 
failures of aggregate expressions onto error classes
URL: https://github.com/apache/spark/pull/38498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes

2022-11-04 Thread GitBox


MaxGekk commented on PR #38498:
URL: https://github.com/apache/spark/pull/38498#issuecomment-1303953617

   +1, LGTM. Merging to master.
   Thank you, @LuciferYang.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming
+otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+  case _ => false
+}
+  }
+
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
   def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
+  plan: LogicalPlan): Unit = {
 def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
   case Join(left, right, joinType, _, _)

Review Comment:
   Will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #38223: [SPARK-40770][PYTHON] Improved error messages for applyInPandas for schema mismatch

2022-11-04 Thread GitBox


ueshin commented on code in PR #38223:
URL: https://github.com/apache/spark/pull/38223#discussion_r1014300546


##
python/pyspark/worker.py:
##
@@ -159,27 +226,13 @@ def wrapped(left_key_series, left_value_series, 
right_key_series, right_value_se
 key_series = left_key_series if not left_df.empty else 
right_key_series
 key = tuple(s[0] for s in key_series)
 result = f(key, left_df, right_df)
-if not isinstance(result, pd.DataFrame):
-raise TypeError(
-"Return type of the user-defined function should be "
-"pandas.DataFrame, but is {}".format(type(result))
-)
-# the number of columns of result have to match the return type
-# but it is fine for result to have no columns at all if it is empty
-if not (
-len(result.columns) == len(return_type) or len(result.columns) == 
0 and result.empty
-):
-raise RuntimeError(
-"Number of columns of the returned pandas.DataFrame "
-"doesn't match specified schema. "
-"Expected: {} Actual: {}".format(len(return_type), 
len(result.columns))
-)
+verify_pandas_result(result, return_type, 
assign_cols_by_name(runner_conf))

Review Comment:
   `wrapped` will be called many times. We want to reduce as much overhead as 
possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #37887: [SPARK-40360] ALREADY_EXISTS and NOT_FOUND exceptions

2022-11-04 Thread GitBox


MaxGekk commented on code in PR #37887:
URL: https://github.com/apache/spark/pull/37887#discussion_r1014297862


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala:
##
@@ -20,66 +20,112 @@ package org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, quoteNameParts }
 import org.apache.spark.sql.types.StructType
 
 /**
  * Thrown by a catalog when an item already exists. The analyzer will rethrow 
the exception
  * as an [[org.apache.spark.sql.AnalysisException]] with the correct position 
information.
  */
 class DatabaseAlreadyExistsException(db: String)
-  extends NamespaceAlreadyExistsException(s"Database '$db' already exists")
+  extends NamespaceAlreadyExistsException(Array(db))
 
-class NamespaceAlreadyExistsException(message: String)
-  extends AnalysisException(
-message,
-errorClass = Some("_LEGACY_ERROR_TEMP_1118"),
-messageParameters = Map("msg" -> message)) {
+
+class NamespaceAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
   def this(namespace: Array[String]) = {
-this(s"Namespace '${namespace.quoted}' already exists")
+this(errorClass = "SCHEMA_ALREADY_EXISTS",
+  Map("schemaName" -> quoteNameParts(namespace)))
   }
 }
 
-class TableAlreadyExistsException(message: String, cause: Option[Throwable] = 
None)
-  extends AnalysisException(
-message,
-errorClass = Some("_LEGACY_ERROR_TEMP_1116"),
-messageParameters = Map("msg" -> message),
-cause = cause) {
+
+class TableAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String],
+  cause: Option[Throwable] = None)
+  extends AnalysisException(errorClass, messageParameters, cause = cause) {
   def this(db: String, table: String) = {
-this(s"Table or view '$table' already exists in database '$db'")
+this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" ->
+(quoteIdentifier(db) + "." + quoteIdentifier(table
+  }
+
+  def this(table: String) = {
+this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" ->
+quoteNameParts(UnresolvedAttribute.parseAttributeName(table
   }
 
-  def this(tableIdent: Identifier) = {
-this(s"Table ${tableIdent.quoted} already exists")
+  def this(table: Seq[String]) = {
+this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" -> quoteNameParts(table)))
+  }
+}
+
+class TempTableAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String],
+  cause: Option[Throwable] = None)
+  extends AnalysisException(errorClass, messageParameters, cause = cause) {
+  def this(table: String) = {
+this(errorClass = "TEMP_TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName"
+-> quoteNameParts(UnresolvedAttribute.parseAttributeName(table
   }
 }
 
-class TempTableAlreadyExistsException(table: String)
-  extends TableAlreadyExistsException(s"Temporary view '$table' already 
exists")
+class PartitionAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
+  def this(db: String, table: String, spec: TablePartitionSpec) = {
+this(errorClass = "PARTITIONS_ALREADY_EXIST",
+  Map("partitionList" -> ("PARTITION (" +
+spec.map( kv => quoteIdentifier(kv._1) + s" = ${kv._2}").mkString(", 
") + ")"),
+"tableName" -> (quoteIdentifier(db) + "." + quoteIdentifier(table
+  }
+
+  def this(tableName: String, partitionIdent: InternalRow, partitionSchema: 
StructType) = {
+this(errorClass = "PARTITIONS_ALREADY_EXIST",
+  Map("partitionList" ->
+("PARTITION (" + 
partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
+.map( kv => quoteIdentifier(s"${kv._2}") + s" = ${kv._1}").mkString(", 
") + ")"),
+"tableName" -> 
quoteNameParts(UnresolvedAttribute.parseAttributeName(tableName
+  }
+}
 
-class PartitionsAlreadyExistException(message: String) extends 
AnalysisException(message) {
+class PartitionsAlreadyExistException(errorClass: String, messageParameters: 
Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
   def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
-this(s"The following partitions already exist in table '$table' database 
'$db':\n"
-  + specs.mkString("\n===\n"))
+this(errorClass = 

[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-04 Thread GitBox


jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1014297681


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 for (batchId <- batchIds if batchId > thresholdBatchId) {
   val path = batchIdToPath(batchId)
   fileManager.delete(path)
+  if (metadataCacheEnabled) batchCache.remove(batchId)
   logTrace(s"Removed metadata log file: $path")
 }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 
inconsistent list, it also

Review Comment:
   I think this comment is out of date.  Amazon now delivers strong read after 
write consistency.  I will remove in a subsequent PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] anchovYu commented on a diff in pull request #37887: [SPARK-40360] ALREADY_EXISTS and NOT_FOUND exceptions

2022-11-04 Thread GitBox


anchovYu commented on code in PR #37887:
URL: https://github.com/apache/spark/pull/37887#discussion_r1014283621


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala:
##
@@ -20,66 +20,112 @@ package org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, quoteNameParts }
 import org.apache.spark.sql.types.StructType
 
 /**
  * Thrown by a catalog when an item already exists. The analyzer will rethrow 
the exception
  * as an [[org.apache.spark.sql.AnalysisException]] with the correct position 
information.
  */
 class DatabaseAlreadyExistsException(db: String)
-  extends NamespaceAlreadyExistsException(s"Database '$db' already exists")
+  extends NamespaceAlreadyExistsException(Array(db))
 
-class NamespaceAlreadyExistsException(message: String)
-  extends AnalysisException(
-message,
-errorClass = Some("_LEGACY_ERROR_TEMP_1118"),
-messageParameters = Map("msg" -> message)) {
+
+class NamespaceAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
   def this(namespace: Array[String]) = {
-this(s"Namespace '${namespace.quoted}' already exists")
+this(errorClass = "SCHEMA_ALREADY_EXISTS",
+  Map("schemaName" -> quoteNameParts(namespace)))
   }
 }
 
-class TableAlreadyExistsException(message: String, cause: Option[Throwable] = 
None)
-  extends AnalysisException(
-message,
-errorClass = Some("_LEGACY_ERROR_TEMP_1116"),
-messageParameters = Map("msg" -> message),
-cause = cause) {
+
+class TableAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String],
+  cause: Option[Throwable] = None)
+  extends AnalysisException(errorClass, messageParameters, cause = cause) {
   def this(db: String, table: String) = {
-this(s"Table or view '$table' already exists in database '$db'")
+this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" ->
+(quoteIdentifier(db) + "." + quoteIdentifier(table
+  }
+
+  def this(table: String) = {
+this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" ->
+quoteNameParts(UnresolvedAttribute.parseAttributeName(table
   }
 
-  def this(tableIdent: Identifier) = {
-this(s"Table ${tableIdent.quoted} already exists")
+  def this(table: Seq[String]) = {
+this(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" -> quoteNameParts(table)))
+  }
+}
+
+class TempTableAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String],
+  cause: Option[Throwable] = None)
+  extends AnalysisException(errorClass, messageParameters, cause = cause) {
+  def this(table: String) = {
+this(errorClass = "TEMP_TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName"
+-> quoteNameParts(UnresolvedAttribute.parseAttributeName(table
   }
 }
 
-class TempTableAlreadyExistsException(table: String)
-  extends TableAlreadyExistsException(s"Temporary view '$table' already 
exists")
+class PartitionAlreadyExistsException(errorClass: String, messageParameters: 
Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
+  def this(db: String, table: String, spec: TablePartitionSpec) = {
+this(errorClass = "PARTITIONS_ALREADY_EXIST",
+  Map("partitionList" -> ("PARTITION (" +
+spec.map( kv => quoteIdentifier(kv._1) + s" = ${kv._2}").mkString(", 
") + ")"),
+"tableName" -> (quoteIdentifier(db) + "." + quoteIdentifier(table
+  }
+
+  def this(tableName: String, partitionIdent: InternalRow, partitionSchema: 
StructType) = {
+this(errorClass = "PARTITIONS_ALREADY_EXIST",
+  Map("partitionList" ->
+("PARTITION (" + 
partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
+.map( kv => quoteIdentifier(s"${kv._2}") + s" = ${kv._1}").mkString(", 
") + ")"),
+"tableName" -> 
quoteNameParts(UnresolvedAttribute.parseAttributeName(tableName
+  }
+}
 
-class PartitionsAlreadyExistException(message: String) extends 
AnalysisException(message) {
+class PartitionsAlreadyExistException(errorClass: String, messageParameters: 
Map[String, String])
+  extends AnalysisException(errorClass, messageParameters) {
   def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
-this(s"The following partitions already exist in table '$table' database 
'$db':\n"
-  + specs.mkString("\n===\n"))
+this(errorClass = 

[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014281945


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming

Review Comment:
   Oh thank you so much for spotting that out!



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {

Review Comment:
   Done, put it under isStreamStreamIntervalJoin. Also add `private` before the 
functions to limit the scope. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014280302


##
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##
@@ -128,6 +128,65 @@ private[sql] object ArrowConverters extends Logging {
 }
   }
 
+  private[sql] def toArrowBatchIterator(
+  rowIter: Iterator[InternalRow],
+  schema: StructType,
+  maxRecordsPerBatch: Int,
+  timeZoneId: String,
+  context: TaskContext): Iterator[(Array[Byte], Long, Long)] = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+  "toArrowBatchIterator", 0, Long.MaxValue)
+
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val unloader = new VectorUnloader(root)
+val arrowWriter = ArrowWriter.create(root)
+
+if (context != null) { // for test at driver
+  context.addTaskCompletionListener[Unit] { _ =>
+root.close()
+allocator.close()
+  }
+}
+
+new Iterator[(Array[Byte], Long, Long)] {
+
+  override def hasNext: Boolean = rowIter.hasNext || {
+root.close()
+allocator.close()
+false
+  }
+
+  override def next(): (Array[Byte], Long, Long) = {
+val out = new ByteArrayOutputStream()
+val writeChannel = new WriteChannel(Channels.newChannel(out))
+
+var rowCount = 0L
+var estimatedSize = 0L
+Utils.tryWithSafeFinally {
+  while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || rowCount < 
maxRecordsPerBatch)) {
+val row = rowIter.next()
+arrowWriter.write(row)
+rowCount += 1
+estimatedSize += SizeEstimator.estimate(row)
+  }
+  arrowWriter.finish()
+  val batch = unloader.getRecordBatch()
+
+  MessageSerializer.serialize(writeChannel, arrowSchema)

Review Comment:
   sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014279677


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()
+var numBatches = 0L
+
+if (rows.getNumPartitions > 0) {
+  val batches = rows.mapPartitionsInternal { iter =>
+ArrowConverters
+  .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+  }
+
+  val obj = new Object
+
+  val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => 
iter.toArray
+
+  val resultHandler = (partitionId: Int, taskResult: Array[(Array[Byte], 
Long, Long)]) =>
+obj.synchronized {
+  var batchId = partitionId.toLong << 33
+  taskResult.foreach { case (bytes, count, size) =>
+val response = proto.Response.newBuilder().setClientId(clientId)
+val batch = proto.Response.ArrowBatch
+  .newBuilder()
+  .setBatchId(batchId)
+  .setRowCount(count)
+  .setUncompressedBytes(size)
+  .setCompressedBytes(bytes.length)
+  .setData(ByteString.copyFrom(bytes))
+  .build()
+response.setArrowBatch(batch)
+responseObserver.onNext(response.build())

Review Comment:
   This callback is currently executed by the DAGScheduler thread, and if this 
expensive no job/stage can be scheduled during this call. We really should move 
this to a separate thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014279677


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()
+var numBatches = 0L
+
+if (rows.getNumPartitions > 0) {
+  val batches = rows.mapPartitionsInternal { iter =>
+ArrowConverters
+  .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+  }
+
+  val obj = new Object
+
+  val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => 
iter.toArray
+
+  val resultHandler = (partitionId: Int, taskResult: Array[(Array[Byte], 
Long, Long)]) =>
+obj.synchronized {
+  var batchId = partitionId.toLong << 33
+  taskResult.foreach { case (bytes, count, size) =>
+val response = proto.Response.newBuilder().setClientId(clientId)
+val batch = proto.Response.ArrowBatch
+  .newBuilder()
+  .setBatchId(batchId)
+  .setRowCount(count)
+  .setUncompressedBytes(size)
+  .setCompressedBytes(bytes.length)
+  .setData(ByteString.copyFrom(bytes))
+  .build()
+response.setArrowBatch(batch)
+responseObserver.onNext(response.build())

Review Comment:
   This callback is currently executed by the DAGScheduler thread, and if this 
expensive no job/stage can be scheduler during this call. We really should move 
this to a separate thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014270879


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()

Review Comment:
   You need to wrap this and all the code below in 
SQLExecution.withExecutionId(..) or use Dataset.withAction otherwise you break 
the UI and a bunch of other things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014270879


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()

Review Comment:
   You need to wrap this in SQLExecution.withExecutionId(..) or use 
Dataset.withAction otherwise you break the UI and a bunch of other things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #38468: [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

2022-11-04 Thread GitBox


hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014269647


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   responseObserver.onNext(response.build())
 }
 
-responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+val spark = dataframe.sparkSession
+val schema = dataframe.schema
+// TODO: control the batch size instead of max records
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+val rows = dataframe.queryExecution.executedPlan.execute()
+var numBatches = 0L
+
+if (rows.getNumPartitions > 0) {
+  val batches = rows.mapPartitionsInternal { iter =>
+ArrowConverters
+  .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+  }
+
+  val obj = new Object
+
+  val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => 
iter.toArray

Review Comment:
   This breaks sorted results. A higher partition can complete earlier than 
lower ones thus breaking the order. That is why I the snippet I posted buffered 
the partitions in the handler, while the main thread scanned over them 1 by 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SandishKumarHN commented on a diff in pull request #37972: [SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf

2022-11-04 Thread GitBox


SandishKumarHN commented on code in PR #37972:
URL: https://github.com/apache/spark/pull/37972#discussion_r1014260137


##
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.protobuf
+
+import com.google.protobuf.{ByteString, DynamicMessage, Message}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{RandomDataGenerator, Row}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
NoopFilters, OrderedFilters, StructFilters}
+import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, 
GenericInternalRow, Literal}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.protobuf.utils.{ProtobufUtils, SchemaConverters}
+import org.apache.spark.sql.sources.{EqualTo, Not}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ProtobufCatalystDataConversionSuite
+extends SparkFunSuite
+with SharedSparkSession
+with ExpressionEvalHelper {
+
+  private def checkResult(
+  data: Literal,
+  descFilePath: String,
+  messageName: String,
+  expected: Any): Unit = {
+checkEvaluation(
+  ProtobufDataToCatalyst(
+CatalystDataToProtobuf(data, descFilePath, messageName),
+descFilePath,
+messageName,
+Map.empty),
+  prepareExpectedResult(expected))
+  }
+
+  protected def checkUnsupportedRead(
+  data: Literal,
+  descFilePath: String,
+  actualSchema: String,
+  badSchema: String): Unit = {
+
+val binary = CatalystDataToProtobuf(data, descFilePath, actualSchema)
+
+intercept[Exception] {
+  ProtobufDataToCatalyst(binary, descFilePath, badSchema, Map("mode" -> 
"FAILFAST")).eval()
+}
+
+val expected = {
+  val expectedSchema = ProtobufUtils.buildDescriptor(descFilePath, 
badSchema)
+  SchemaConverters.toSqlType(expectedSchema).dataType match {
+case st: StructType =>
+  Row.fromSeq((0 until st.length).map { _ =>
+null
+  })
+case _ => null
+  }
+}
+
+checkEvaluation(
+  ProtobufDataToCatalyst(binary, descFilePath, badSchema, Map("mode" -> 
"PERMISSIVE")),
+  expected)
+  }
+
+  protected def prepareExpectedResult(expected: Any): Any = expected match {
+// Spark byte and short both map to Protobuf int
+case b: Byte => b.toInt
+case s: Short => s.toInt
+case row: GenericInternalRow => 
InternalRow.fromSeq(row.values.map(prepareExpectedResult))
+case array: GenericArrayData => new 
GenericArrayData(array.array.map(prepareExpectedResult))
+case map: MapData =>
+  val keys = new GenericArrayData(
+
map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult))
+  val values = new GenericArrayData(
+
map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult))
+  new ArrayBasedMapData(keys, values)
+case other => other
+  }
+
+  private val testingTypes = Seq(
+StructType(StructField("int32_type", IntegerType, nullable = true) :: Nil),
+StructType(StructField("double_type", DoubleType, nullable = true) :: Nil),
+StructType(StructField("float_type", FloatType, nullable = true) :: Nil),
+StructType(StructField("bytes_type", BinaryType, nullable = true) :: Nil),
+StructType(StructField("string_type", StringType, nullable = true) :: Nil))
+
+  private val catalystTypesToProtoMessages: Map[DataType, String] = Map(
+IntegerType -> "IntegerMsg",
+DoubleType -> "DoubleMsg",
+FloatType -> "FloatMsg",
+BinaryType -> "BytesMsg",
+StringType -> "StringMsg")
+
+  testingTypes.foreach { dt =>
+val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)

Review Comment:
   @MaxGekk thank you will look into it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [spark] swamirishi commented on pull request #38377: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

2022-11-04 Thread GitBox


swamirishi commented on PR #38377:
URL: https://github.com/apache/spark/pull/38377#issuecomment-1303865771

   > Two points:
   > 
   > * spark.driver.log.dfsDir is typically expected to be a path to hdfs - so 
resolving it relative to current working directory does not make sense
   > * If `rootDir` is referencing local filesystem, it will get resolved 
relative to local fs by the `fileSystem` for `rootDir`
   
   This can be done using makeQualified API instead of resolvePath.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] FouadApp closed pull request #38512: WIP: [SPARK-38564] Support read hive table from subdirectory source

2022-11-04 Thread GitBox


FouadApp closed pull request #38512: WIP: [SPARK-38564] Support read hive table 
from subdirectory source
URL: https://github.com/apache/spark/pull/38512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] FouadApp opened a new pull request, #38512: WIP: [SPARK-38564] Support read hive table from subdirectory source

2022-11-04 Thread GitBox


FouadApp opened a new pull request, #38512:
URL: https://github.com/apache/spark/pull/38512

   
   ### What changes were proposed in this pull request?
   This support could read source files of partitioned hive table with 
subdirectories.
   
   ### Why are the changes needed?
   While use spark engine to read a partititioned hive table with 
subdirectories, cannot read the source files in subdirectories .
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   ### How was this patch tested?
   new test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #38498: [SPARK-40769][CORE][SQL] Migrate type check failures of aggregate expressions onto error classes

2022-11-04 Thread GitBox


LuciferYang commented on PR #38498:
URL: https://github.com/apache/spark/pull/38498#issuecomment-1303797442

   GA passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] FouadApp commented on pull request #32679: [SPARK-28098][SQL]Support read hive table while LeafDir had multi-level paths

2022-11-04 Thread GitBox


FouadApp commented on PR #32679:
URL: https://github.com/apache/spark/pull/32679#issuecomment-1303780636

   I have the same problem:
   
   With the TEZ engine writing data in the presence of union all:
   
   part_date=/HIVE_UNION_SUBDIR_1/part_000 (parquet)
   part_date=/HIVE_UNION_SUBDIR_2
   part_date=/HIVE_UNION_SUBDIR_x
   
   when I run a query on this data 
   df = spark.sql("select * from table")
   df.count()  ---> 0
   
   Spark cannot read the subdir !
   
   I have a solution  biut is not recommmended  
   spark.conf.set("mapred.input.dir.recursive", "true")
   spark.conf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
   spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false") # This 
param is not recommended in Spark


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] FouadApp commented on pull request #32679: [SPARK-28098][SQL]Support read hive table while LeafDir had multi-level paths

2022-11-04 Thread GitBox


FouadApp commented on PR #32679:
URL: https://github.com/apache/spark/pull/32679#issuecomment-1303772792

   > Any chance of this getting picked up again? I saw it was merged in a fork: 
[lyft#40](https://github.com/lyft/spark/pull/40) but it would be great to have 
it upstream
   
   but, it's not on the official repo (apache/spark) !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan opened a new pull request, #38511: [WIP][SPARK-41017][SQL] Do not push Filter through reference-only Project

2022-11-04 Thread GitBox


cloud-fan opened a new pull request, #38511:
URL: https://github.com/apache/spark/pull/38511

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #38462: [SPARK-40533] [CONNECT] [PYTHON] Support most built-in literal types for Python in Spark Connect

2022-11-04 Thread GitBox


HyukjinKwon closed pull request #38462: [SPARK-40533] [CONNECT] [PYTHON] 
Support most built-in literal types for Python in Spark Connect
URL: https://github.com/apache/spark/pull/38462


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #38462: [SPARK-40533] [CONNECT] [PYTHON] Support most built-in literal types for Python in Spark Connect

2022-11-04 Thread GitBox


HyukjinKwon commented on PR #38462:
URL: https://github.com/apache/spark/pull/38462#issuecomment-1303686102

   Merged to master.
   
   Let's address complete types in a followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #38485: [SPARK-41001] [CONNECT] [PYTHON] Implementing Connection String for Python Client

2022-11-04 Thread GitBox


HyukjinKwon closed pull request #38485: [SPARK-41001] [CONNECT] [PYTHON] 
Implementing Connection String for Python Client
URL: https://github.com/apache/spark/pull/38485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dwsmith1983 commented on pull request #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


dwsmith1983 commented on PR #38510:
URL: https://github.com/apache/spark/pull/38510#issuecomment-1303640935

   @itholic I was going over another topic and made some updates on sql 
performance tuning as well. I added a screenshot of the markdown. This how you 
want it correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dwsmith1983 opened a new pull request, #38510: [MINOR][DOC] revisions for spark sql performance tuning to improve readability and grammar

2022-11-04 Thread GitBox


dwsmith1983 opened a new pull request, #38510:
URL: https://github.com/apache/spark/pull/38510

   
   
   ### What changes were proposed in this pull request?
   
   I made some small grammar fixes related to dependent clause followed but 
independent clauses, starting a sentence with an introductory phrase, using the 
plural with when are is present in the sentence, and other small fixes to 
improve readability.
   
   https://spark.apache.org/docs/latest/sql-performance-tuning.html 
   
   https://user-images.githubusercontent.com/7563201/18862-d9418bc1-2fcd-4eff-be8e-af412add6946.png;>
   https://user-images.githubusercontent.com/7563201/18871-b5629ec6-9a9a-4f5a-96ce-0f90bd3e97b3.png;>
   https://user-images.githubusercontent.com/7563201/18877-c7f1ac95-618d-4c6d-a5ed-f84e927bb5b9.png;>
   
   
   ### Why are the changes needed?
   
   These changes improve the readability of the Spark documentation for new 
users or those studying up.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, these changes impact the spark documentation.
   
   
   ### How was this patch tested?
   
   No test were created as these changes were solely in markdown.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srielau commented on a diff in pull request #38490: [SPARK-41009][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1070` to `LOCATION_ALREADY_EXISTS`

2022-11-04 Thread GitBox


srielau commented on code in PR #38490:
URL: https://github.com/apache/spark/pull/38490#discussion_r1014078710


##
core/src/main/resources/error/error-classes.json:
##
@@ -668,6 +668,24 @@
   }
 }
   },
+  "LOCATION_ALREADY_EXISTS" : {
+"message" : [
+  "Cannot create the location  because it already exists.",
+  "Choose a different path or remove the existing location."
+],
+"subClass" : {

Review Comment:
   Is this level of differentiation necessary?
   Aside the sub-error class text partially repeats what the general text says.
   To make this message useful I think we should explain how/that the the  
table's directory name (?) is derived from the table name?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srielau commented on a diff in pull request #38490: [SPARK-41009][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1070` to `LOCATION_ALREADY_EXISTS`

2022-11-04 Thread GitBox


srielau commented on code in PR #38490:
URL: https://github.com/apache/spark/pull/38490#discussion_r1014078710


##
core/src/main/resources/error/error-classes.json:
##
@@ -668,6 +668,24 @@
   }
 }
   },
+  "LOCATION_ALREADY_EXISTS" : {
+"message" : [
+  "Cannot create the location  because it already exists.",
+  "Choose a different path or remove the existing location."
+],
+"subClass" : {

Review Comment:
   Is this level of differentiation necessary?
   Aside the sub-error class partially repeats what the general test says.
   To make this message useful I think we should explain how/that the the  
table's directory name (?) is derived from the table name?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013973661


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if len(predNames) != len(fieldNames) or not all(
+[predNames[i] == fieldNames[i] for i in range(len(fieldNames))]
+):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError("Prediction results for ArrayType must be an 
ndarray.")
+else:
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already 

[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013972337


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):

Review Comment:
   I suggest we support the case: StructType containing field of ArrayType, 
this is a common case.
   Correspondingly, if we support this, let's add similar checking like 
https://github.com/apache/spark/pull/37734/files#r1013971238



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013971238


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if len(predNames) != len(fieldNames) or not all(
+[predNames[i] == fieldNames[i] for i in range(len(fieldNames))]
+):
+raise ValueError(
+"Prediction result columns did not match expected return_type "
+"columns: expected {}, got: {}".format(fieldNames, predNames)
+)
+elif isinstance(return_type, ArrayType):
+if isinstance(preds, np.ndarray):
+if len(preds) != num_input_rows:

Review Comment:
   We need to add an additional checking: `len(preds.shape)` == 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaoyajun02 commented on a diff in pull request #38333: [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

2022-11-04 Thread GitBox


gaoyajun02 commented on code in PR #38333:
URL: https://github.com/apache/spark/pull/38333#discussion_r1013969510


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -794,7 +794,15 @@ final class ShuffleBlockFetcherIterator(
 // since the last call.
 val msg = s"Received a zero-size buffer for block $blockId from 
$address " +
   s"(expectedApproxSize = $size, 
isNetworkReqDone=$isNetworkReqDone)"
-throwFetchFailedException(blockId, mapIndex, address, new 
IOException(msg))
+if (blockId.isShuffleChunk) {
+  logWarning(msg)
+  
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)

Review Comment:
   ok, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-11-04 Thread GitBox


WeichenXu123 commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1013960675


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,542 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+index = 0
+data_size = len(data)
+while index < data_size:
+yield data.iloc[index : index + batch_size]
+index += batch_size
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> None:
+"""Validate model predictions against the expected pandas_udf 
return_type."""
+if isinstance(return_type, StructType):
+struct_rtype: StructType = return_type
+fieldNames = struct_rtype.names
+if isinstance(preds, dict):
+# dictionary of columns
+predNames = list(preds.keys())
+if not all(v.shape == (num_input_rows,) for v in preds.values()):
+raise ValueError("Prediction results for StructType fields 
must be scalars.")
+elif isinstance(preds, list) and isinstance(preds[0], dict):
+# rows of dictionaries
+predNames = list(preds[0].keys())
+if len(preds) != num_input_rows:
+raise ValueError("Prediction results must have same length as 
input data.")
+else:
+raise ValueError(
+"Prediction results for StructType must be a dictionary or "
+"a list of dictionary, got: {}".format(type(preds))
+)
+
+# check column names
+if len(predNames) != len(fieldNames) or not all(
+[predNames[i] == fieldNames[i] for i in range(len(fieldNames))]
+):

Review Comment:
   The checking here force the returned dict to have the same key order with 
the order in struct type field list. I think it is not necessary (note some 
case the dict key order is undefined), we can simply check `set(predNames) 
==set(fieldNames)` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >