[GitHub] [spark] itholic opened a new pull request, #40910: [SPARK-43234][CONNECT][PYTHON] Migrate `ValueError` from Conect DataFrame into error class
itholic opened a new pull request, #40910: URL: https://github.com/apache/spark/pull/40910 ### What changes were proposed in this pull request? This PR proposes to migrate ValueError into PySparkValueError from Spark Connect DataFrame. ### Why are the changes needed? To improve the errors from Spark Connect. ### Does this PR introduce _any_ user-facing change? No, it's error improvements. ### How was this patch tested? The existing tests should pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] puneetguptanitj opened a new pull request, #40909: [SPARK-42411] [Kubernetes] Add support for istio with strict mtls
puneetguptanitj opened a new pull request, #40909: URL: https://github.com/apache/spark/pull/40909 ### What changes were proposed in this pull request? Following describes the changes made, all changes are behind respective configuration properties 1. Followed the same model as driver to create svc records for executors as well. The lifecycle of the SVC record is tied to executor lifecycle. While registering with drivers, executors now supply their SVC hostname. **Controlled by a new configuration (added as part of this PR): `spark.kubernetes.executor.service`** ![exec_service](https://user-images.githubusercontent.com/3784871/233761856-f135c726-9c90-4a44-bcac-84ce97f09b9d.png) 2. Allowed drivers and executors to bind to all IPs. **Controlled by existing properties `spark.driver.bindAddress` and `spark.executor.bindAddress`. This PR makes `0.0.0.0` a permissible value** ![bind_address](https://user-images.githubusercontent.com/3784871/233761913-f763a0f0-bccf-4743-871c-f982b93cf7ba.png) 3. Added support for providing 1. pre start script: that would be run before driver/executor JVM gets started. This script can do any setup e.g. waiting for istio-proxy sidecar to be up. 2. post stop script: that would be run after driver/executor JVM completes. This script can do any cleanup example in our case it makes a REST call to shutdown sidecar.These scripts are not part of the PR because the onus of providing any specialized cleanup would lie with the client. In our case it is provided by Proton. **Controlled by new configurations (added as part of this PR): `spark.kubernetes.post.stop.script`, `spark.kubernetes.pre.start.script` which when set will be executed before and after the driver/executor JVM** ![sidecar_termination](https://user-images.githubusercontent.com/3784871/233762111-9251aa14-87a7-4339-8549-45b4ae1e06dc.png) ### Why are the changes needed? Spark allows using Kubernetes as the resource scheduler however off the shelf does not work with Kubernetes cluster using Istio service mesh in strict MTLS mode because: 1. For Istio to work, it needs to know the network identity of all possible network paths. Currently network identity (through a K8s service record) is created only for the driver pod but not for executors. 2. Istio adds a istio-proxy sidecar to every pod and this sidecar handles all pod to pod networking. However the sidecar binds to Pod IP and then sends ingress traffic to localhost (if PILOT_ENABLE_INBOUND_PASSTHROUGH is set to false). Therefore for ingress traffic to correctly reach application processes (like driver and executor JVMs), the processes need to bind to all IPs and not just Pod IP, as otherwise, traffic routed to localhost by the sidecar would not reach the application processes. Off the shelf Spark allows driver and executors to only bind to Pod IP and therefore does not work with Istio. 3. Unlike the Istio sidecar, driver/executor containers in the pod can finish. In which case a pod would enter NotReady state (as driver/executor containers can complete) while sidecar would continue to run. Therefore once the driver/executor containers are done, they need to signal to the istio sidecar as well to terminate. ### Does this PR introduce *any* user-facing change? Yes, it adds configs that can be used to run on an K8s cluster using Istio service mesh, with strict MTLS. ### How was this patch tested? - Added new unit tests - Tested on a strict MTLS Istio Kubernetes cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40908: [SPARK-42750] Support Insert By Name statement
Hisoka-X commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1174282609 ## sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala: ## @@ -122,6 +125,16 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { } } + test("insert with column list - by name") { Review Comment: I will add doc and more test case later. Before do this I want to make sure my idea without problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X opened a new pull request, #40908: [SPARK-42750] Support Insert By Name statement
Hisoka-X opened a new pull request, #40908: URL: https://github.com/apache/spark/pull/40908 ### What changes were proposed in this pull request? In some use cases, users have incoming dataframes with fixed column names which might differ from the canonical order. Currently there's no way to handle this easily through the INSERT INTO API - the user has to make sure the columns are in the right order as they would when inserting a tuple. We should add an optional BY NAME clause, such that: `INSERT INTO tgt BY NAME ` takes each column of and inserts it into the column in `tgt` which has the same name according to the configured `resolver` logic. ### Why are the changes needed? Add new feature `INSERT INTO BY NAME` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
Hisoka-X commented on PR #40865: URL: https://github.com/apache/spark/pull/40865#issuecomment-1518501790 kindly ping @cloud-fan . All CI passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
cloud-fan commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1174277575 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * By default, the value of a file-constant metadata column is obtained by looking up the column's + * name in the file's metadata column value map. However, implementations can override this method + * in order to provide an extractor that has access to the entire [[PartitionedFile]] when + * deriving the column's value. + * + * NOTE: Extractors are lazy, invoked only if the query actually selects their column at runtime. + * + * See also [[FileFormat.getFileConstantMetadataColumnValue]]. + */ + def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] = Review Comment: I have 2 questions about the API design: 1. File source implementations now need to define constant metadata columns in both `metadataSchemaFields` and `fileConstantMetadataExtractors`. Shall we have a single API to define constant metadata columns? The API should define both column name, type, and extractor. 2. Should the extractor return a `Literal` instead of `Any`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition
LuciferYang commented on code in PR #40628: URL: https://github.com/apache/spark/pull/40628#discussion_r1174270420 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala: ## @@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { .collect() assert(result.sorted.toSeq === Seq(23, 25, 25, 27)) } + + test("Dataset foreach") { Review Comment: @zhenlineo Sorry for forgetting to notify you, after revert https://github.com/apache/spark/commit/09a43531d30346bb7c8d213822513dc35c70f82e everything is ok now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40901: [SPARK-43195][BUILD][FOLLOWUP] Fix mima check for Scala 2.13
LuciferYang commented on PR #40901: URL: https://github.com/apache/spark/pull/40901#issuecomment-1518489585 thanks @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect
bogao007 commented on PR #40834: URL: https://github.com/apache/spark/pull/40834#issuecomment-1518455976 > @bogao007 what's your JIRA id? I need to assign you in the JIRA ticket. I think this might be my JIRA id `62cbecffa94a6f9c0efe1622`, let me know if it doesn't work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #40725: [SPARK-43082][CONNECT][PYTHON] Arrow-optimized Python UDFs in Spark Connect
HyukjinKwon closed pull request #40725: [SPARK-43082][CONNECT][PYTHON] Arrow-optimized Python UDFs in Spark Connect URL: https://github.com/apache/spark/pull/40725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40725: [SPARK-43082][CONNECT][PYTHON] Arrow-optimized Python UDFs in Spark Connect
HyukjinKwon commented on PR #40725: URL: https://github.com/apache/spark/pull/40725#issuecomment-1518443157 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #40901: [SPARK-43195][BUILD][FOLLOWUP] Fix mima check for Scala 2.13
HyukjinKwon closed pull request #40901: [SPARK-43195][BUILD][FOLLOWUP] Fix mima check for Scala 2.13 URL: https://github.com/apache/spark/pull/40901 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40901: [SPARK-43195][BUILD][FOLLOWUP] Fix mima check for Scala 2.13
HyukjinKwon commented on PR #40901: URL: https://github.com/apache/spark/pull/40901#issuecomment-1518441760 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40907: [PYTHON] Implement `__dir__()` in `pyspark.sql.dataframe.DataFrame` to include columns
HyukjinKwon commented on code in PR #40907: URL: https://github.com/apache/spark/pull/40907#discussion_r1174235481 ## python/pyspark/sql/dataframe.py: ## @@ -3008,6 +3008,25 @@ def __getattr__(self, name: str) -> Column: jc = self._jdf.apply(name) return Column(jc) +def __dir__(self): +""" +Examples + Review Comment: Should better have some explanation for the example here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40907: [PYTHON] Implement `__dir__()` in `pyspark.sql.dataframe.DataFrame` to include columns
HyukjinKwon commented on PR #40907: URL: https://github.com/apache/spark/pull/40907#issuecomment-1518441484 Mind filing a JIRA please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect
HyukjinKwon commented on PR #40834: URL: https://github.com/apache/spark/pull/40834#issuecomment-1518440835 @bogao007 what's your JIRA id? I need to assign you in the JIRA ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect
HyukjinKwon closed pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect URL: https://github.com/apache/spark/pull/40834 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect
HyukjinKwon commented on PR #40834: URL: https://github.com/apache/spark/pull/40834#issuecomment-1518440451 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #39312: [SPARK-41788][SQL] Move InsertIntoStatement to basicLogicalOperators
github-actions[bot] closed pull request #39312: [SPARK-41788][SQL] Move InsertIntoStatement to basicLogicalOperators URL: https://github.com/apache/spark/pull/39312 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #39481: [MINOR][SQL] Update the import order of scala package in class `SpecificParquetRecordReaderBase`
github-actions[bot] commented on PR #39481: URL: https://github.com/apache/spark/pull/39481#issuecomment-1518439502 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum closed pull request #40838: [SPARK-43174][SQL] Fix SparkSQLCLIDriver completer
wangyum closed pull request #40838: [SPARK-43174][SQL] Fix SparkSQLCLIDriver completer URL: https://github.com/apache/spark/pull/40838 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #40838: [SPARK-43174][SQL] Fix SparkSQLCLIDriver completer
wangyum commented on PR #40838: URL: https://github.com/apache/spark/pull/40838#issuecomment-1518438939 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alexanderwu-db opened a new pull request, #40907: [PYTHON] Implement `__dir__()` in `pyspark.sql.dataframe.DataFrame` to include columns
alexanderwu-db opened a new pull request, #40907: URL: https://github.com/apache/spark/pull/40907 ### What changes were proposed in this pull request? Override the parent `__dir__()` method on Python `DataFrame` class to include column names. Main benefit of this is that any autocomplete engine that uses `dir()` to generate autocomplete suggestions (e.g. IPython kernel, Databricks Notebooks) will suggest column names on the completion `df.|`. ### Why are the changes needed? To keep `__dir__()` consistent with `__getattr__()`. So this is arguably a bug fix. Increases productivity for anyone who uses an autocomplete engine on pyspark code. Example of column attribute completion coming for free after this change: https://user-images.githubusercontent.com/84545946/233747057-56b2589d-d075-4d13-8349-ac5142c38c62.mov ### Does this PR introduce _any_ user-facing change? Will change the output of `dir(df)`. If the user chooses to use the private method `df.__dir__()`, they will also notice an output and docstring difference there. ### How was this patch tested? New doctest with three assertions. Output where I only ran this test: ![pyspark test passed](https://user-images.githubusercontent.com/84545946/233744674-b59191a7-08bf-4f3e-a491-945e687727b0.png) To test it in a notebook: ```python from pyspark.sql.dataframe import DataFrame class DataFrameWithColAttrs(DataFrame): def __init__(self, df): super().__init__(df._jdf, df._sql_ctx if df._sql_ctx else df._session) def __dir__(self): attrs = super().__dir__() attrs.extend(attr for attr in self.columns if attr not in attrs) return attrs ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on pull request #40906: [SPARK-43134] [CONNECT] [SS] JVM client StreamingQuery exception() API
WweiL commented on PR #40906: URL: https://github.com/apache/spark/pull/40906#issuecomment-1518400027 @rangadi @pengzhon-db -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL opened a new pull request, #40906: [SPARK-43134] [CONNECT] [SS] JVM client StreamingQuery exception() API
WweiL opened a new pull request, #40906: URL: https://github.com/apache/spark/pull/40906 ### What changes were proposed in this pull request? Add StreamingQuery exception() API for JVM client ### Why are the changes needed? Development of SS Connect ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Manual test: ``` Spark session available as 'spark'. _ __ ____ / ___/ __/ /__ / /___ ___ _/ /_ \__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/ ___/ / /_/ / /_/ / / / , /___/ /_/ / / / / / / / __/ /__/ /_ // .___/\__,_/_/ /_/|_| \/\/_/ /_/_/ /_/\___/\___/\__/ /_/ @ val q = spark.readStream.format("rate").load().writeStream.option("checkpointLocation", "/home/wei.liu/ckpt").toTable("my_table") q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery@772f3a3f @ q.exception res1: Option[org.apache.spark.sql.streaming.StreamingQueryException] = None @ q.stop() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions
amaliujia commented on PR #40796: URL: https://github.com/apache/spark/pull/40796#issuecomment-1518390479 Overall looks reasonable to me. I only have questions over the proto validation in the server side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions
amaliujia commented on code in PR #40796: URL: https://github.com/apache/spark/pull/40796#discussion_r1174198988 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -545,34 +540,94 @@ class SparkConnectPlanner(val session: SparkSession) { private def transformTypedMapPartitions( fun: proto.CommonInlineUserDefinedFunction, child: LogicalPlan): LogicalPlan = { -val udf = fun.getScalarScalaUdf -val udfPacket = - Utils.deserialize[UdfPacket]( -udf.getPayload.toByteArray, -SparkConnectArtifactManager.classLoaderWithArtifacts) -assert(udfPacket.inputEncoders.size == 1) -val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head) -val rEnc = ExpressionEncoder(udfPacket.outputEncoder) +val udf = unpackUdf(fun) +assert(udf.inputEncoders.size == 1) +val iEnc = ExpressionEncoder(udf.inputEncoders.head) +val rEnc = ExpressionEncoder(udf.outputEncoder) val deserializer = UnresolvedDeserializer(iEnc.deserializer) val deserialized = DeserializeToObject(deserializer, generateObjAttr(iEnc), child) val mapped = MapPartitions( - udfPacket.function.asInstanceOf[Iterator[Any] => Iterator[Any]], + udf.function.asInstanceOf[Iterator[Any] => Iterator[Any]], generateObjAttr(rEnc), deserialized) SerializeFromObject(rEnc.namedExpressions, mapped) } private def transformGroupMap(rel: proto.GroupMap): LogicalPlan = { -val pythonUdf = transformPythonUDF(rel.getFunc) -val cols = - rel.getGroupingExpressionsList.asScala.toSeq.map(expr => Column(transformExpression(expr))) +val commonUdf = rel.getFunc +commonUdf.getFunctionCase match { + case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF => +transformTypedGroupMap(rel, commonUdf) -Dataset - .ofRows(session, transformRelation(rel.getInput)) - .groupBy(cols: _*) - .flatMapGroupsInPandas(pythonUdf) - .logicalPlan + case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF => +val pythonUdf = transformPythonUDF(commonUdf) +val cols = + rel.getGroupingExpressionsList.asScala.toSeq.map(expr => +Column(transformExpression(expr))) + +Dataset + .ofRows(session, transformRelation(rel.getInput)) + .groupBy(cols: _*) + .flatMapGroupsInPandas(pythonUdf) + .logicalPlan + + case _ => +throw InvalidPlanInput( + s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported") +} + } + + private def transformTypedGroupMap( + rel: GroupMap, + commonUdf: CommonInlineUserDefinedFunction): LogicalPlan = { +// Compute grouping key +val logicalPlan = transformRelation(rel.getInput) +val udf = unpackUdf(commonUdf) +assert(rel.getGroupingExpressionsCount == 1) +val groupFunc = rel.getGroupingExpressionsList.asScala.toSeq + .map(expr => unpackUdf(expr.getCommonInlineUserDefinedFunction)) + .head + +assert(groupFunc.inputEncoders.size == 1) +val vEnc = ExpressionEncoder(groupFunc.inputEncoders.head) +val kEnc = ExpressionEncoder(groupFunc.outputEncoder) +val uEnc = ExpressionEncoder(udf.outputEncoder) +assert(udf.inputEncoders.nonEmpty) +// ukEnc != kEnc if user has called kvDS.keyAs +val ukEnc = ExpressionEncoder(udf.inputEncoders.head) + +val withGroupingKey = new AppendColumns( + groupFunc.function.asInstanceOf[Any => Any], + vEnc.clsTag.runtimeClass, + vEnc.schema, + UnresolvedDeserializer(vEnc.deserializer), + kEnc.namedExpressions, + logicalPlan) + +// Compute sort order +val sortExprs = + rel.getSortingExpressionsList.asScala.toSeq.map(expr => transformExpression(expr)) +val sortOrder: Seq[SortOrder] = sortExprs.map { + case expr: SortOrder => expr + case expr: Expression => SortOrder(expr, Ascending) Review Comment: Who will be in charge of checking non-supported expr? ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -545,34 +540,94 @@ class SparkConnectPlanner(val session: SparkSession) { private def transformTypedMapPartitions( fun: proto.CommonInlineUserDefinedFunction, child: LogicalPlan): LogicalPlan = { -val udf = fun.getScalarScalaUdf -val udfPacket = - Utils.deserialize[UdfPacket]( -udf.getPayload.toByteArray, -SparkConnectArtifactManager.classLoaderWithArtifacts) -assert(udfPacket.inputEncoders.size == 1) -val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head) -val rEnc = ExpressionEncoder(udfPacket.outputEncoder) +val udf = unpackUdf(fun) +assert(udf.inputEncoders.size == 1) +val iEnc = ExpressionEncoder(udf.inputEncoders.head) +val rEnc = ExpressionEncoder(udf.outputEncoder)
[GitHub] [spark] amaliujia commented on a diff in pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect
amaliujia commented on code in PR #40834: URL: https://github.com/apache/spark/pull/40834#discussion_r1174195181 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) { } cols } - Deduplicate(groupCols, queryExecution.analyzed) + if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols, queryExecution.analyzed) Review Comment: If there is anything wrong, I think deprecating a field is easier than deprecating a new relation type. Probably starting from this by adding a new flag way is a good beginning. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) { } cols } - Deduplicate(groupCols, queryExecution.analyzed) + if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols, queryExecution.analyzed) Review Comment: If there is anything wrong, I think deprecating a field is easier than deprecating a new relation type. Probably starting from this by adding a new flag is a good beginning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on pull request #40887: [SPARK-43144] Scala Client DataStreamReader table() API
WweiL commented on PR #40887: URL: https://github.com/apache/spark/pull/40887#issuecomment-1518384473 @HyukjinKwon Can you merge this when you get a chance? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID
anishshri-db commented on PR #40905: URL: https://github.com/apache/spark/pull/40905#issuecomment-1518359441 @HeartSaVioR - please take a look and merge after builds pass, thx ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on pull request #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID
anishshri-db commented on PR #40905: URL: https://github.com/apache/spark/pull/40905#issuecomment-1518358413 @siying - you might need to enable github actions for the tests to run -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] siying opened a new pull request, #40905: [SPARK-43233] [SS] Add logging for Kafka Batch Reading for topic partition, offset range and task ID
siying opened a new pull request, #40905: URL: https://github.com/apache/spark/pull/40905 ### What changes were proposed in this pull request? We add a logging when creating the batch reader with task ID, topic, partition and offset range included. The log line looks like following: 23/04/18 22:35:38 INFO KafkaBatchReaderFactory: Creating Kafka reader partitionId=1 partition=StreamingDustTest-KafkaToKafkaTopic-4ccf8662-c3ca-4f3b-871e-1853c0e61765-source-2 fromOffset=0 untilOffset=3 queryId=b5b806c3-ebf3-432e-a9a7-d882d474c0f5 batchId=0 taskId=1 ### Why are the changes needed? Right now, for structure streaming from Kafka, it's hard to finding which task handling which topic/partition and offset range. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run KafkaMicroBatchV2SourceSuite and watch logging outputs contain information needed. Also does a small cluster test and observe logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40834: [SPARK-43046] [SS] [Connect] Implemented Python API dropDuplicatesWithinWatermark for Spark Connect
rangadi commented on code in PR #40834: URL: https://github.com/apache/spark/pull/40834#discussion_r1174175904 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -750,7 +751,8 @@ class SparkConnectPlanner(val session: SparkSession) { } cols } - Deduplicate(groupCols, queryExecution.analyzed) + if (rel.getWithinWatermark) DeduplicateWithinWatermark(groupCols, queryExecution.analyzed) Review Comment: @HyukjinKwon thanks. This is much simpler code wise. 1:1 for logical plans is not strictly required, I hope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pengzhon-db opened a new pull request, #40904: [WIP][POC] foreachbatch spark connect
pengzhon-db opened a new pull request, #40904: URL: https://github.com/apache/spark/pull/40904 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40782: [SPARK-42669][CONNECT] Short circuit local relation RPCs
ueshin commented on code in PR #40782: URL: https://github.com/apache/spark/pull/40782#discussion_r1174158327 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala: ## @@ -80,7 +80,10 @@ private[sql] class SparkResult[T]( } while (reader.loadNextBatch()) { val rowCount = root.getRowCount -assert(root.getRowCount == response.getArrowBatch.getRowCount) // HUH! +assert( + response.getIsLocalBuilt || +root.getRowCount == response.getArrowBatch.getRowCount Review Comment: I guess adding the row count as optional to `LocalRelation` message is more useful. The server can check if the expected number of rows are provided, and we can reuse it here to set the row count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sweisdb opened a new pull request, #40903: [WIP][SPARK-NNNNN] Updating AES-CBC support to not use OpenSSL's KDF
sweisdb opened a new pull request, #40903: URL: https://github.com/apache/spark/pull/40903 ### What changes were proposed in this pull request? The `aes_encrypt` support for CBC mode currently uses a key derivation function from OpenSSL's EVP_BytesToKey to generate an initalization vector. This is not typical. This KDF is intended to be used with a passphrase and is discouraged from being used in general. This change updates `aes_encrypt` to generate a radnom initialization vector and prepend it to the ciphertext. This is identical to how the existing GCM mode works. ### Why are the changes needed? We want to have the ciphertext output similar across different modes. We don't want CBC to use a rarely-used, out of date KDF to derive an initialization vector. Rather, we will generate a random vector. ### Does this PR introduce _any_ user-facing change? Not immediately. AES CBC support is landed, but in development. ### How was this patch tested? A new unit test in `DataFrameFunctionsSuite` was added to test both GCM and CBC modes. Also, a new standalone unit test suite was added in `ExpressionImplUtilsSuite` to test all the modes and various key lengths. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rshkv commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
rshkv commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1518291912 @cloud-fan, maybe let's consider multi-part attribute references as fine or at least separate from this? What do you think? I opened another PR just changing `DslAttr.attr` to not re-parse and break on special characters #40902. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on pull request #40864: [WIP] Nested DataType compatibility in Arrow-optimized Python UDF and Pandas UDF
xinrong-meng commented on PR #40864: URL: https://github.com/apache/spark/pull/40864#issuecomment-1518288506 After double thoughts, we'd better not touch Pandas UDF to preserve backward compatibility. Let me close the PR and have a new prototype. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng closed pull request #40864: [WIP] Nested DataType compatibility in Arrow-optimized Python UDF and Pandas UDF
xinrong-meng closed pull request #40864: [WIP] Nested DataType compatibility in Arrow-optimized Python UDF and Pandas UDF URL: https://github.com/apache/spark/pull/40864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rshkv commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
rshkv commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1518283716 Damn, thank you for reverting guys. Unsure why GA didn't test the last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on pull request #40699: [SPARK-43063][SQL] `df.show` handle null should print NULL instead of null
sadikovi commented on PR #40699: URL: https://github.com/apache/spark/pull/40699#issuecomment-1518270247 To be honest, I don't understand why spark-sql shell is expected to be consistent with spark-shell or pyspark shell. Can someone elaborate? I can see making spark-sql shell consistent with Presto/Trino/MySQL/Postgres, etc. but I don't understand why Scala REPL should be consistent with SQL terminal in terms of displaying results - they serve different purposes. I do support having a consistent visual behaviour for NULLs/nulls, just as long as it does not break other features like Cast or `collect.toString`. Maybe we could simply add a conversion method to display values in a DataFrame in whatever format we need when calling `.show` instead of changing Cast. In fact, we can refactor it into a separate class and reuse it in spark-sql and spark-shell. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] woj-i commented on pull request #40821: [SPARK-43152][spark-structured-streaming] Parametrisable output metadata path (_spark_metadata)
woj-i commented on PR #40821: URL: https://github.com/apache/spark/pull/40821#issuecomment-1518209816 Surprisingly after commiting of naming improvements (no logic changes) the build failed. I think it's not related to my change. It happened at [Run / Build modules: streaming, sql-kafka-0-10, streaming-kafka-0-10, mllib-local, mllib, yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl, connect, protobuf](https://github.com/woj-i/spark/actions/runs/4762682139/jobs/8474612155#logs) Can you please help me with fixing the build? I extracted some logs that most likely cause failure ``` 2023-04-21T08:58:47.7065800Z [0m[[0m[0minfo[0m] [0m[0m[32mUserDefinedFunctionE2ETestSuite:[0m[0m 2023-04-21T08:58:47.9937562Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed filter (225 milliseconds)[0m[0m 2023-04-21T08:58:48.0428439Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed filter - java (48 milliseconds)[0m[0m 2023-04-21T08:58:48.1508396Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed map (98 milliseconds)[0m[0m 2023-04-21T08:58:48.2447937Z [0m[[0m[0minfo[0m] [0m[0m[32m- filter with condition (92 milliseconds)[0m[0m 2023-04-21T08:58:48.3442084Z [0m[[0m[0minfo[0m] [0m[0m[32m- filter with col(*) (98 milliseconds)[0m[0m 2023-04-21T08:58:48.4067957Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed map - java (55 milliseconds)[0m[0m 2023-04-21T08:58:48.4836489Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed flat map (74 milliseconds)[0m[0m 2023-04-21T08:58:48.5347903Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed flat map - java (51 milliseconds)[0m[0m 2023-04-21T08:58:48.8260858Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed map partition (288 milliseconds)[0m[0m 2023-04-21T08:58:49.0958006Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset typed map partition - java (270 milliseconds)[0m[0m 2023-04-21T08:58:49.1921226Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset foreach (92 milliseconds)[0m[0m 2023-04-21T08:58:49.2289858Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset foreach - java (34 milliseconds)[0m[0m 2023-04-21T08:58:49.3390396Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset foreachPartition (109 milliseconds)[0m[0m 2023-04-21T08:58:49.4353655Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset foreachPartition - java (99 milliseconds)[0m[0m 2023-04-21T08:58:49.4971434Z [0m[[0m[0minfo[0m] [0m[0m[32m- Dataset foreach: change not visible to client (65 milliseconds)[0m[0m 2023-04-21T08:58:49.5121205Z [0m[[0m[0minfo[0m] [0m[0m[32mReplE2ESuite:[0m[0m 2023-04-21T08:58:53.2505694Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.2692727Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.3262467Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.3544612Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.3646754Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.3896517Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.4061277Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.4268730Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.4353509Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.4546791Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.4808386Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.4964638Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.5198604Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.5300108Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.5383343Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.5494860Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.5631736Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.5913690Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.6397803Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.6769519Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.7221075Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.7631252Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.8129701Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.8289367Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.8541965Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.8848448Z sh: 1: cannot open /dev/tty: No such device or address 2023-04-21T08:58:53.9060169Z sh: 1:
[GitHub] [spark] zhenlineo commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition
zhenlineo commented on code in PR #40628: URL: https://github.com/apache/spark/pull/40628#discussion_r1174040639 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala: ## @@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { .collect() assert(result.sorted.toSeq === Seq(23, 25, 25, 27)) } + + test("Dataset foreach") { Review Comment: With the latest master, I've been running the test locally for 100 times, and cannot repro this error. Do we have some way to see how flaky is this test? We can certainly mute these tests if it continue to be flaky. My local run indicate it is not flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1174017427 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * A scanner must derive each file-constant metadata field's value from each [[PartitionedFile]] + * it processes. By default, the value is obtained by a direct lookup of the column's name on + * [[PartitionedFile.otherConstantMetadataColumnValues]] (see + * [[FileFormat.getFileConstantMetadataColumnValue]]). However, implementations can override this + * method in order to provide more sophisticated lazy extractors (e.g. in case the column value is + * complicated or expensive to compute). Review Comment: That wording was intentional... the simple way is usually the right way and thus should be the default. If something special is going on, I'd prefer it to be highly visible. Put another way -- if the column _can_ be populated by simply pulling its value from the map, why should we encourage the use of an extractor? Especially when the map is the only part of `PartitionedFile` whose content a `FileIndex` has any real control over? An alternative would be to require all constant metadata columns to provide extractors, and define a default extractor that just goes to the map... but this seems like extra boilerplate for no real benefit? (I originally wanted to just embed the extractor in the attribute definition directly, but attribute/struct metadata cannot store code and IMO that's a wise design decision we shouldn't change) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1174017427 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * A scanner must derive each file-constant metadata field's value from each [[PartitionedFile]] + * it processes. By default, the value is obtained by a direct lookup of the column's name on + * [[PartitionedFile.otherConstantMetadataColumnValues]] (see + * [[FileFormat.getFileConstantMetadataColumnValue]]). However, implementations can override this + * method in order to provide more sophisticated lazy extractors (e.g. in case the column value is + * complicated or expensive to compute). Review Comment: That wording was intentional... the simple way is usually the right way and thus should be the default. If something special is going on, I'd prefer it to be highly visible. Put another way -- if the column _can_ be populated by simply pulling its value from the map, why should we encourage the use of an extractor? An alternative would be to require all constant metadata columns to provide extractors, and define a default extractor that just goes to the map... but this seems like extra boilerplate for no real benefit? (I originally wanted to just embed the extractor in the attribute definition directly, but attribute/struct metadata cannot store code and IMO that's a wise design decision we shouldn't change) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on PR #40885: URL: https://github.com/apache/spark/pull/40885#issuecomment-1518150945 FYI the [tests that failed](https://github.com/ryan-johnson-databricks/spark/actions/runs/4765599580/jobs/8471553389) are broken upstream -- they also fail on the version of spark/master this PR is currently based on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jchen5 commented on pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
jchen5 commented on PR #40865: URL: https://github.com/apache/spark/pull/40865#issuecomment-1518139762 I checked the case of `any_value(false)` in a debugger and it works because resultWithZeroTups is NULL there, so that explains why it works - because there's an aggregation value around false, and not just constant false. This fix seems to work with the cases I've thought of (results match SQL standard semantics and postgres). I think this PR is mergeable. I'll plan to add some more test coverage for related cases in a future PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1174017427 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * A scanner must derive each file-constant metadata field's value from each [[PartitionedFile]] + * it processes. By default, the value is obtained by a direct lookup of the column's name on + * [[PartitionedFile.otherConstantMetadataColumnValues]] (see + * [[FileFormat.getFileConstantMetadataColumnValue]]). However, implementations can override this + * method in order to provide more sophisticated lazy extractors (e.g. in case the column value is + * complicated or expensive to compute). Review Comment: That wording was intentional... the simple way is usually the right way and thus should be the default. If something special is going on, I'd prefer it to be highly visible. Put another way -- if the column _can_ be represented simply by pulling the value from the map, why should we encourage the use of an extractor? An alternative would be to require all constant metadata columns to provide extractors, and define a default extractor that just goes to the map... but this seems like extra boilerplate for no real benefit? (I originally wanted to just embed the extractor in the attribute definition directly, but attribute/struct metadata cannot store code and IMO that's a wise design decision we shouldn't change) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on pull request #40847: [SPARK-43185][BUILD] Inline `hadoop-client` related properties in `pom.xml`
sunchao commented on PR #40847: URL: https://github.com/apache/spark/pull/40847#issuecomment-1518134382 > So if there is an way to build and test Hadoop 3.0/3.1 successfully before this pr, but it loses after this pr, I think we should stop this work because Apache Spark has not previously stated on any occasion that it no longer supports Hadoop 3.0/3.1, right ? Yes, I think that's probably a sensible thing to do. > @xkrogen @sunchao @pan3793 Can you give a command that can be used for build & test with Hadoop 3.0/3.1? I want to manually check it, thanks ~ You can check this JIRA for the command to build: https://issues.apache.org/jira/browse/SPARK-37994 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on pull request #40900: [SPARK-43196][YARN][FOLLOWUP] Remove unnecessary Hadoop version check
sunchao commented on PR #40900: URL: https://github.com/apache/spark/pull/40900#issuecomment-1518132719 Merged to master, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao closed pull request #40900: [SPARK-43196][YARN][FOLLOWUP] Remove unnecessary Hadoop version check
sunchao closed pull request #40900: [SPARK-43196][YARN][FOLLOWUP] Remove unnecessary Hadoop version check URL: https://github.com/apache/spark/pull/40900 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm closed pull request #40843: [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service
mridulm closed pull request #40843: [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service URL: https://github.com/apache/spark/pull/40843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #40843: [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service
mridulm commented on PR #40843: URL: https://github.com/apache/spark/pull/40843#issuecomment-1518125522 Thanks for fixing this @otterc ! Thanks for the reviews @tgravescs, @zhouyejoe :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on pull request #40887: [SPARK-43144] Scala Client DataStreamReader table() API
WweiL commented on PR #40887: URL: https://github.com/apache/spark/pull/40887#issuecomment-1518097080 > You probably also need to generate the golden file for `ProtoToParsedPlanTestSuite`. There is instructions documented in that suite. Ah I see there is also a bin file. I did run this but my local dev env is broken so I ran this on devbox... and scp the file back.. will also scp that one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40901: [SPARK-43195][BUILD][FOLLOWUP] Fix mima check for Scala 2.13
LuciferYang commented on PR #40901: URL: https://github.com/apache/spark/pull/40901#issuecomment-1518017291 cc @pan3793 @sunchao @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #40901: [SPARK-43195][FOLLOWUP] Fix mima check for Scala 2.13
LuciferYang opened a new pull request, #40901: URL: https://github.com/apache/spark/pull/40901 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40893: [SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl from pre-built distribution
srowen commented on PR #40893: URL: https://github.com/apache/spark/pull/40893#issuecomment-1517997626 Is this possible now that Hadoop 2 support is gone? just checking what the implications of this change are. Are the Hive.get changes needed, or can we batch those changes with reverting the Hive <2.3.9 support? I also don't know what the implication of that is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40675: [SPARK-42657][CONNECT] Support to find and transfer client-side REPL classfiles to server as artifacts
LuciferYang commented on PR #40675: URL: https://github.com/apache/spark/pull/40675#issuecomment-1517995794 @vicennial I found `ReplE2ESuite` always failed in Java 17 GA daily test: - https://github.com/apache/spark/actions/runs/4726264540/jobs/8385681548 - https://github.com/apache/spark/actions/runs/4737365554/jobs/8410097712 - https://github.com/apache/spark/actions/runs/4748319019/jobs/8434392414 - https://github.com/apache/spark/actions/runs/4759278349/jobs/8458399201 https://user-images.githubusercontent.com/1475305/233674106-5cf0c4cf-ed4f-4d75-be42-3b7c39dc2936.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
cloud-fan commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1517960380 let's revert first. Seems GA wrongly reported green for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
wangyum commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1517960382 Reverted: https://github.com/apache/spark/commit/3523d83ac472b330bb86a442365c0a15f7e53f8c. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao closed pull request #40889: [SPARK-41660][SQL][3.3] Only propagate metadata columns if they are used
huaxingao closed pull request #40889: [SPARK-41660][SQL][3.3] Only propagate metadata columns if they are used URL: https://github.com/apache/spark/pull/40889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on pull request #40889: [SPARK-41660][SQL][3.3] Only propagate metadata columns if they are used
huaxingao commented on PR #40889: URL: https://github.com/apache/spark/pull/40889#issuecomment-1517948030 Merged to branch-3.3. Thank you all for reviewing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
LuciferYang commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1517927714 > https://github.com/apache/spark/actions/runs/4765094614/jobs/8470442826 > > https://user-images.githubusercontent.com/1475305/233662686-1bfb0633-bbd6-4c4a-a9b9-ecdd8e2f0ffc.png;> > > @cloud-fan many test failed after this one, should we revert this one first? also ping @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
LuciferYang commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1517926432 https://github.com/apache/spark/actions/runs/4765094614/jobs/8470442826 https://user-images.githubusercontent.com/1475305/233662686-1bfb0633-bbd6-4c4a-a9b9-ecdd8e2f0ffc.png;> @cloud-fan many test failed after this one, should we revert this one first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] johanl-db commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
johanl-db commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1173803115 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * A scanner must derive each file-constant metadata field's value from each [[PartitionedFile]] + * it processes. By default, the value is obtained by a direct lookup of the column's name on + * [[PartitionedFile.otherConstantMetadataColumnValues]] (see + * [[FileFormat.getFileConstantMetadataColumnValue]]). However, implementations can override this + * method in order to provide more sophisticated lazy extractors (e.g. in case the column value is + * complicated or expensive to compute). Review Comment: The way I read it is that using `otherConstantMetadataColumnValues` is the default and an extractor should only be used for more advanced use cases. I think we rather want to encourage using extractors whenever possible and only fallback to manually providing the values when necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1173799650 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * A scanner must derive each file-constant metadata field's value from each [[PartitionedFile]] + * it processes. By default, the value is obtained by a direct lookup of the column's name on + * [[PartitionedFile.otherConstantMetadataColumnValues]] (see + * [[FileFormat.getFileConstantMetadataColumnValue]]). However, implementations can override this + * method in order to provide more sophisticated lazy extractors (e.g. in case the column value is + * complicated or expensive to compute). Review Comment: Reworded, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode
wangyum commented on PR #40616: URL: https://github.com/apache/spark/pull/40616#issuecomment-1517869092 @gengliangwang Has updated the description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on pull request #40900: [SPARK-43196][YARN][FOLLOWUP] Remove unnecessary Hadoop version check
pan3793 commented on PR #40900: URL: https://github.com/apache/spark/pull/40900#issuecomment-1517850737 @sunchao @LuciferYang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 opened a new pull request, #40900: [SPARK-43196][YARN][FOLLOWUP] Remove unnecessary Hadoop version check
pan3793 opened a new pull request, #40900: URL: https://github.com/apache/spark/pull/40900 ### What changes were proposed in this pull request? It's not necessary to check Hadoop version 2.9+ or 3.0+ now. ### Why are the changes needed? Simplify code and docs. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on pull request #40843: [SPARK-43179][SHUFFLE] Allowing apps to control whether their metadata gets saved in the db by the External Shuffle Service
tgravescs commented on PR #40843: URL: https://github.com/apache/spark/pull/40843#issuecomment-1517841735 lgtm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
Hisoka-X commented on code in PR #40865: URL: https://github.com/apache/spark/pull/40865#discussion_r1173768209 ## sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-count-bug.sql.out: ## @@ -86,14 +86,14 @@ from l -- !query schema struct -- !query output -1 2.0 NULL -1 2.0 NULL +1 2.0 false +1 2.0 false Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1173755343 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -241,47 +256,74 @@ object FileFormat { FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, nullable = false), FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false)) + /** + * All [[BASE_METADATA_FIELDS]] require custom extractors because they are derived directly from + * fields of the [[PartitionedFile]], and do have entries in the file's metadata map. + */ + val BASE_METADATA_EXTRACTORS: Map[String, PartitionedFile => Any] = Map( +FILE_PATH -> { pf: PartitionedFile => pf.toPath.toString }, +FILE_NAME -> { pf: PartitionedFile => pf.toPath.getName }, +FILE_SIZE -> { pf: PartitionedFile => pf.fileSize }, +FILE_BLOCK_START -> { pf: PartitionedFile => pf.start }, +FILE_BLOCK_LENGTH -> { pf: PartitionedFile => pf.length }, +// The modificationTime from the file has millisecond granularity, but the TimestampType for +// `file_modification_time` has microsecond granularity. +FILE_MODIFICATION_TIME -> { pf: PartitionedFile => pf.modificationTime * 1000 } + ) + + /** + * Extracts the [[Literal]] value of a file-constant metadata column from a [[PartitionedFile]]. + * + * If an extractor is available, use it. Otherwise, attempt to fetch the value directly from the + * file's metadata map, returning null if not found. + * + * Raw values (including null) are automatically converted to literals as a courtesy. + */ + def getFileConstantMetadataColumnValue( + name: String, + file: PartitionedFile, + metadataExtractors: Map[String, PartitionedFile => Any]): Literal = { +val extractor = metadataExtractors.get(name).getOrElse { + (_: PartitionedFile).otherConstantMetadataColumnValues.get(name).orNull Review Comment: Yup. I already came to the same conclusion and fixed it on the OSS side. This side will get fixed when I rebase on the OSS sync (if not sooner). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1173759666 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -241,47 +256,74 @@ object FileFormat { FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, nullable = false), FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false)) + /** + * All [[BASE_METADATA_FIELDS]] require custom extractors because they are derived directly from + * fields of the [[PartitionedFile]], and do have entries in the file's metadata map. + */ + val BASE_METADATA_EXTRACTORS: Map[String, PartitionedFile => Any] = Map( +FILE_PATH -> { pf: PartitionedFile => pf.toPath.toString }, +FILE_NAME -> { pf: PartitionedFile => pf.toPath.getName }, +FILE_SIZE -> { pf: PartitionedFile => pf.fileSize }, +FILE_BLOCK_START -> { pf: PartitionedFile => pf.start }, +FILE_BLOCK_LENGTH -> { pf: PartitionedFile => pf.length }, +// The modificationTime from the file has millisecond granularity, but the TimestampType for +// `file_modification_time` has microsecond granularity. +FILE_MODIFICATION_TIME -> { pf: PartitionedFile => pf.modificationTime * 1000 } + ) + + /** + * Extracts the [[Literal]] value of a file-constant metadata column from a [[PartitionedFile]]. + * + * If an extractor is available, use it. Otherwise, attempt to fetch the value directly from the + * file's metadata map, returning null if not found. + * + * Raw values (including null) are automatically converted to literals as a courtesy. + */ + def getFileConstantMetadataColumnValue( + name: String, + file: PartitionedFile, + metadataExtractors: Map[String, PartitionedFile => Any]): Literal = { +val extractor = metadataExtractors.get(name).getOrElse { + (_: PartitionedFile).otherConstantMetadataColumnValues.get(name).orNull Review Comment: Agreed. I think it's already fixed locally but I didn't push yet in case there were other comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1173758526 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -203,6 +203,21 @@ trait FileFormat { * method. Technically, a file format could choose suppress them, but that is not recommended. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * A scanner must derive each file-constant metadata field's value from each [[PartitionedFile]] + * it processes. By default, the value is obtained by a direct lookup of the column's name on + * [[PartitionedFile.otherConstantMetadataColumnValues]] (see + * [[FileFormat.getFileConstantMetadataColumnValue]]). However, implementations can override this + * method in order to provide more sophisticated lazy extractors (e.g. in case the column value is + * complicated or expensive to compute). Review Comment: I thought I _did_ describe it explicitly: 1. If you provide an extractor, the extractor has access to all state in the `PartitionedFile` (including the column value map) and can do any computations it needs to. 2. Otherwise, the column's value is fetched from the column value map. Was it not clear [enough] in the comment that the extractor has access to the entire `PartitionedFile` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jchen5 commented on a diff in pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
jchen5 commented on code in PR #40865: URL: https://github.com/apache/spark/pull/40865#discussion_r1173756531 ## sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-count-bug.sql.out: ## @@ -86,14 +86,14 @@ from l -- !query schema struct -- !query output -1 2.0 NULL -1 2.0 NULL +1 2.0 false +1 2.0 false Review Comment: Yes, old result here was wrong, new result is correct. @Hisoka-X There's a comment about it in the input file, please update it to remove the "but this case is wrong due to bug SPARK-43156" part -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40885: [SPARK-43226] Define extractors for file-constant metadata
ryan-johnson-databricks commented on code in PR #40885: URL: https://github.com/apache/spark/pull/40885#discussion_r1173755343 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala: ## @@ -241,47 +256,74 @@ object FileFormat { FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, nullable = false), FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false)) + /** + * All [[BASE_METADATA_FIELDS]] require custom extractors because they are derived directly from + * fields of the [[PartitionedFile]], and do have entries in the file's metadata map. + */ + val BASE_METADATA_EXTRACTORS: Map[String, PartitionedFile => Any] = Map( +FILE_PATH -> { pf: PartitionedFile => pf.toPath.toString }, +FILE_NAME -> { pf: PartitionedFile => pf.toPath.getName }, +FILE_SIZE -> { pf: PartitionedFile => pf.fileSize }, +FILE_BLOCK_START -> { pf: PartitionedFile => pf.start }, +FILE_BLOCK_LENGTH -> { pf: PartitionedFile => pf.length }, +// The modificationTime from the file has millisecond granularity, but the TimestampType for +// `file_modification_time` has microsecond granularity. +FILE_MODIFICATION_TIME -> { pf: PartitionedFile => pf.modificationTime * 1000 } + ) + + /** + * Extracts the [[Literal]] value of a file-constant metadata column from a [[PartitionedFile]]. + * + * If an extractor is available, use it. Otherwise, attempt to fetch the value directly from the + * file's metadata map, returning null if not found. + * + * Raw values (including null) are automatically converted to literals as a courtesy. + */ + def getFileConstantMetadataColumnValue( + name: String, + file: PartitionedFile, + metadataExtractors: Map[String, PartitionedFile => Any]): Literal = { +val extractor = metadataExtractors.get(name).getOrElse { + (_: PartitionedFile).otherConstantMetadataColumnValues.get(name).orNull Review Comment: Yup. I already came to the same conclusion and fixed it on the OSS side. This side will get fixed when I rebase on the OSS sync (if not sooner). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ted-jenks commented on pull request #39907: [SPARK-42359][SQL] Support row skipping when reading CSV files
ted-jenks commented on PR #39907: URL: https://github.com/apache/spark/pull/39907#issuecomment-1517806003 @HyukjinKwon I have done more work on this, please let me know what you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] juliuszsompolski commented on pull request #40899: [MINOR][CONNECT] Fix missing stats for SQL Command
juliuszsompolski commented on PR #40899: URL: https://github.com/apache/spark/pull/40899#issuecomment-1517798535 The original PR was merged to 3.4, so this bufgix should also go to branch-3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on pull request #37417: [SPARK-33782][K8S][CORE]Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster mod
advancedxy commented on PR #37417: URL: https://github.com/apache/spark/pull/37417#issuecomment-1517797912 @pralabhkumar thanks for your work. I noticed similar issue when running spark application on K8S, it's helpful feature However, this pr might have some inefficiency to download files/jars twice when running k8s-cluster mode. ``` java if (deployMode == CLIENT) { // jars are downloaded once localJars = Option(args.jars).map { downloadFileList(_, targetDir, sparkConf, hadoopConf) }.orNull // py files are downloaded once localPyFiles = Option(args.pyFiles).map { downloadFileList(_, targetDir, sparkConf, hadoopConf) }.orNull if (isKubernetesClusterModeDriver) { def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): String = { ... } val filesLocalFiles = Option(args.files).map { downloadResourcesToCurrentDirectory(_) }.orNull // jars are downloaded again val jarsLocalJars = Option(args.jars).map { downloadResourcesToCurrentDirectory(_) }.orNull val archiveLocalFiles = Option(args.archives).map { downloadResourcesToCurrentDirectory(_, true) }.orNull // py files are downloaded again val pyLocalFiles = Option(args.pyFiles).map { downloadResourcesToCurrentDirectory(_) }.orNull } } ``` Would you mind to create a followup pr to address this issue? @pralabhkumar Also, there's another catch when running spark on k8s with --files/--archives: These files/archives are already downloaded here, however they are passed as args.files, args.archives, the spark context would copied them (and/or untar them) again when constructing the context, see relevant code: https://github.com/apache/spark/blob/d407a42090d7c027050be7ee723f7e3d8f686ed7/core/src/main/scala/org/apache/spark/SparkContext.scala#L440-L443 And https://github.com/apache/spark/blob/d407a42090d7c027050be7ee723f7e3d8f686ed7/core/src/main/scala/org/apache/spark/SparkContext.scala#L524-L544 cc @Ngone51 @holdenk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] peter-toth commented on pull request #40266: [SPARK-42660][SQL] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule)
peter-toth commented on PR #40266: URL: https://github.com/apache/spark/pull/40266#issuecomment-1517796434 @mskapilks, do you have any update on this? I can to take over this PR and investigate the idea further if you don't have time for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
cloud-fan closed pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters URL: https://github.com/apache/spark/pull/40794 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
cloud-fan commented on PR #40794: URL: https://github.com/apache/spark/pull/40794#issuecomment-1517793940 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
cloud-fan commented on code in PR #40865: URL: https://github.com/apache/spark/pull/40865#discussion_r1173730596 ## sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-count-bug.sql.out: ## @@ -86,14 +86,14 @@ from l -- !query schema struct -- !query output -1 2.0 NULL -1 2.0 NULL +1 2.0 false +1 2.0 false Review Comment: @jchen5 was the old result wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip opened a new pull request, #40899: [MINOR][CONNECT] Fix missing stats for SQL Command
grundprinzip opened a new pull request, #40899: URL: https://github.com/apache/spark/pull/40899 ### What changes were proposed in this pull request? This patch fixes a minor issue in the code where for SQL Commands the plan metrics are not sent to the client. In addition, it renames a method to make clear that the method does not actually send anything but only creates the response object. ### Why are the changes needed? Clarity ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on a diff in pull request #40160: [SPARK-41725][CONNECT] Eager Execution of DF.sql()
grundprinzip commented on code in PR #40160: URL: https://github.com/apache/spark/pull/40160#discussion_r1173683435 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -1450,10 +1458,79 @@ class SparkConnectPlanner(val session: SparkSession) { handleWriteOperationV2(command.getWriteOperationV2) case proto.Command.CommandTypeCase.EXTENSION => handleCommandPlugin(command.getExtension) + case proto.Command.CommandTypeCase.SQL_COMMAND => +handleSqlCommand(command.getSqlCommand, clientId, responseObserver) case _ => throw new UnsupportedOperationException(s"$command not supported.") } } + def handleSqlCommand( + getSqlCommand: SqlCommand, + clientId: String, + responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { +// Eagerly execute commands of the provided SQL string. +val df = session.sql(getSqlCommand.getSql, getSqlCommand.getArgsMap) +// Check if commands have been executed. +val isCommand = df.queryExecution.commandExecuted.isInstanceOf[CommandResult] +val rows = df.logicalPlan match { + case lr: LocalRelation => lr.data + case cr: CommandResult => cr.rows + case _ => Seq.empty +} + +// Convert the results to Arrow. +val schema = df.schema +val maxRecordsPerBatch = session.sessionState.conf.arrowMaxRecordsPerBatch +val maxBatchSize = (SparkEnv.get.conf.get(CONNECT_GRPC_ARROW_MAX_BATCH_SIZE) * 0.7).toLong +val timeZoneId = session.sessionState.conf.sessionLocalTimeZone + +// Convert the data. +val bytes = if (rows.isEmpty) { + ArrowConverters.createEmptyArrowBatch(schema, timeZoneId) +} else { + val batches = ArrowConverters.toBatchWithSchemaIterator( +rows.iterator, +schema, +maxRecordsPerBatch, +maxBatchSize, +timeZoneId) + assert(batches.size == 1) + batches.next() +} + +// To avoid explicit handling of the result on the client, we build the expected input +// of the relation on the server. The client has to simply forward the result. +val result = SqlCommandResult.newBuilder() +if (isCommand) { + result.setRelation( +proto.Relation + .newBuilder() + .setLocalRelation( +proto.LocalRelation + .newBuilder() + .setData(ByteString.copyFrom(bytes +} else { + result.setRelation( +proto.Relation + .newBuilder() + .setSql( +proto.SQL + .newBuilder() + .setQuery(getSqlCommand.getSql) + .putAllArgs(getSqlCommand.getArgsMap))) +} +// Exactly one SQL Command Result Batch +responseObserver.onNext( + ExecutePlanResponse +.newBuilder() +.setClientId(clientId) +.setSqlCommandResult(result) +.build()) + +// Send Metrics +SparkConnectStreamHandler.sendMetricsToResponse(clientId, df) Review Comment: Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #40897: [SPARK-43228][SQL] Join keys also match PartitioningCollection in CoalesceBucketsInJoin
wangyum commented on PR #40897: URL: https://github.com/apache/spark/pull/40897#issuecomment-1517707460 cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rshkv commented on a diff in pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
rshkv commented on code in PR #40794: URL: https://github.com/apache/spark/pull/40794#discussion_r1173669012 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -271,7 +271,7 @@ package object dsl { override def expr: Expression = Literal(s) def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) } -implicit class DslAttr(attr: UnresolvedAttribute) extends ImplicitAttribute { +implicit class DslAttr(override val attr: UnresolvedAttribute) extends ImplicitAttribute { def s: String = attr.name Review Comment: Nevermind, column names created with `$"dotted.col"` were still being parsed as multi-part. Fixed that in 95741fa. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -271,14 +271,17 @@ package object dsl { override def expr: Expression = Literal(s) def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) } -implicit class DslAttr(attr: UnresolvedAttribute) extends ImplicitAttribute { - def s: String = attr.name +implicit class DslAttr(override val attr: UnresolvedAttribute) extends ImplicitAttribute { + def s: String = { +assert(attr.nameParts.length == 1, "attribute must have single name part") +attr.nameParts.head + } } abstract class ImplicitAttribute extends ImplicitOperators { def s: String def expr: UnresolvedAttribute = attr - def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) + def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(Seq(s)) Review Comment: Done in 83b6336. Had update `StringToAttributeConversionHelper.$` as well 95741fa. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rshkv commented on a diff in pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
rshkv commented on code in PR #40794: URL: https://github.com/apache/spark/pull/40794#discussion_r1173675183 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -271,14 +271,17 @@ package object dsl { override def expr: Expression = Literal(s) def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) } -implicit class DslAttr(attr: UnresolvedAttribute) extends ImplicitAttribute { - def s: String = attr.name +implicit class DslAttr(override val attr: UnresolvedAttribute) extends ImplicitAttribute { + def s: String = { +assert(attr.nameParts.length == 1, "attribute must have single name part") +attr.nameParts.head + } } abstract class ImplicitAttribute extends ImplicitOperators { def s: String def expr: UnresolvedAttribute = attr - def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) + def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(Seq(s)) Review Comment: Done (83b6336). Had update `StringToAttributeConversionHelper.$` as well (95741fa). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
Hisoka-X commented on code in PR #40865: URL: https://github.com/apache/spark/pull/40865#discussion_r1173671566 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala: ## @@ -599,10 +600,32 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe if (Utils.isTesting) { assert(mayHaveCountBug.isDefined) } + +def queryOutputFoldable(list: Seq[NamedExpression]): Boolean = { + trimAliases(list.filter(p => p.exprId.equals(query.output.head.exprId)).head).foldable +} + +lazy val resultFoldable = { + query match { +case Project(expressions, _) => + queryOutputFoldable(expressions) +case Aggregate(_, expressions, _) => + queryOutputFoldable(expressions) +case _ => + false + } +} + if (resultWithZeroTups.isEmpty) { // CASE 1: Subquery guaranteed not to have the COUNT bug because it evaluates to NULL // with zero tuples. planWithoutCountBug +} else if (resultFoldable) { Review Comment: Yes, I add `mayHaveCountBug.getOrElse(false)` into check part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jchen5 commented on pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
jchen5 commented on PR #40865: URL: https://github.com/apache/spark/pull/40865#issuecomment-1517695970 > Depends on what is correct results of select *, (select any_value(false) as result from t1 where t0.a = t1.c) from t0) ? Yes, this should return null on empty data. I will try to find another test case to check the potential issue I mentioned. Maybe something like `select *, (select false as result from t1 where t0.a = t1.c) from t0 limit 1)`(correct answer for that is also null). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rshkv commented on a diff in pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
rshkv commented on code in PR #40794: URL: https://github.com/apache/spark/pull/40794#discussion_r1173669012 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -271,7 +271,7 @@ package object dsl { override def expr: Expression = Literal(s) def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) } -implicit class DslAttr(attr: UnresolvedAttribute) extends ImplicitAttribute { +implicit class DslAttr(override val attr: UnresolvedAttribute) extends ImplicitAttribute { def s: String = attr.name Review Comment: Nevermind, column names created with `$"dotted.col"` were still being parsed as multi-part. Fixed that in [95741fa](https://github.com/apache/spark/pull/40794/commits/95741faf4109e83e3c0057b34bd368130bcd8094). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jchen5 commented on a diff in pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
jchen5 commented on code in PR #40865: URL: https://github.com/apache/spark/pull/40865#discussion_r1173668725 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala: ## @@ -599,10 +600,32 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe if (Utils.isTesting) { assert(mayHaveCountBug.isDefined) } + +def queryOutputFoldable(list: Seq[NamedExpression]): Boolean = { + trimAliases(list.filter(p => p.exprId.equals(query.output.head.exprId)).head).foldable +} + +lazy val resultFoldable = { + query match { +case Project(expressions, _) => + queryOutputFoldable(expressions) +case Aggregate(_, expressions, _) => + queryOutputFoldable(expressions) +case _ => + false + } +} + if (resultWithZeroTups.isEmpty) { // CASE 1: Subquery guaranteed not to have the COUNT bug because it evaluates to NULL // with zero tuples. planWithoutCountBug +} else if (resultFoldable) { Review Comment: That answer does look right, let me debug this code to figure out why it's right and if I can find a counterexample. Your check needs to go after the check immediately below to be correct (that's the cause of the test results change Allison commented). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rshkv commented on a diff in pull request #40794: [SPARK-43142] Fix DSL expressions on attributes with special characters
rshkv commented on code in PR #40794: URL: https://github.com/apache/spark/pull/40794#discussion_r1173564024 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -271,7 +271,7 @@ package object dsl { override def expr: Expression = Literal(s) def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) } -implicit class DslAttr(attr: UnresolvedAttribute) extends ImplicitAttribute { +implicit class DslAttr(override val attr: UnresolvedAttribute) extends ImplicitAttribute { def s: String = attr.name Review Comment: ~I'm afraid this breaks existing tests, e.g.:~ (see comment below) ```scala class DataSourceV2StrategySuite extends PlanTest with SharedSparkSession { val attrInts = Seq( $"cint".int, $"c.int".int, ``` fails on ``` java.lang.AssertionError: assertion failed: attribute must have single name part at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.catalyst.dsl.package$ExpressionConversions$DslAttr.s(package.scala:276) at org.apache.spark.sql.catalyst.dsl.package$ExpressionConversions$ImplicitAttribute.int(package.scala:296) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2StrategySuite.(DataSourceV2StrategySuite.scala:32) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at org.scalatest.tools.Runner$.genSuiteConfig(Runner.scala:1403) ``` ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -271,7 +271,7 @@ package object dsl { override def expr: Expression = Literal(s) def attr: UnresolvedAttribute = analysis.UnresolvedAttribute(s) } -implicit class DslAttr(attr: UnresolvedAttribute) extends ImplicitAttribute { +implicit class DslAttr(override val attr: UnresolvedAttribute) extends ImplicitAttribute { def s: String = attr.name Review Comment: ~I'm afraid this breaks existing tests, e.g.:~ (see comment below) ```scala class DataSourceV2StrategySuite extends PlanTest with SharedSparkSession { val attrInts = Seq( $"cint".int, $"c.int".int, ``` ~fails on~ ``` java.lang.AssertionError: assertion failed: attribute must have single name part at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.catalyst.dsl.package$ExpressionConversions$DslAttr.s(package.scala:276) at org.apache.spark.sql.catalyst.dsl.package$ExpressionConversions$ImplicitAttribute.int(package.scala:296) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2StrategySuite.(DataSourceV2StrategySuite.scala:32) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at org.scalatest.tools.Runner$.genSuiteConfig(Runner.scala:1403) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40865: [SPARK-43156][SQL] Fix `COUNT(*) is null` bug in correlated scalar subquery
Hisoka-X commented on code in PR #40865: URL: https://github.com/apache/spark/pull/40865#discussion_r1173667469 ## sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-count-bug.sql.out: ## @@ -106,14 +106,14 @@ from l -- !query schema struct -- !query output -1 2.0 NULL Review Comment: Thanks for remind. I fixed this bug and add test to cover it. I will update `sql.out` later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bjornjorgensen commented on pull request #40878: [SPARK-42780][BUILD] Upgrade `Tink` to 1.9.0
bjornjorgensen commented on PR #40878: URL: https://github.com/apache/spark/pull/40878#issuecomment-1517673103 @LuciferYang Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] NarekDW commented on pull request #39719: [SPARK-42169] [SQL] Implement code generation for to_csv function (StructsToCsv)
NarekDW commented on PR #39719: URL: https://github.com/apache/spark/pull/39719#issuecomment-1517635600 @jaceklaskowski thank you for the review. @MaxGekk just a reminder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] NarekDW commented on a diff in pull request #39719: [SPARK-42169] [SQL] Implement code generation for to_csv function (StructsToCsv)
NarekDW commented on code in PR #39719: URL: https://github.com/apache/spark/pull/39719#discussion_r1173620399 ## sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala: ## @@ -574,4 +575,11 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { $"csv", schema_of_csv("1,2\n2"), Map.empty[String, String].asJava)) checkAnswer(actual, Row(Row(1, "2\n2"))) } + + test("StructsToCsv should not generate codes beyond 64KB") { +val range = Range.inclusive(1, 5000) +val struct = CreateStruct.create(range.map(Literal(_))) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scal
LuciferYang commented on code in PR #40892: URL: https://github.com/apache/spark/pull/40892#discussion_r1173602327 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala: ## @@ -17,6 +17,297 @@ package org.apache.spark.sql.streaming -class StreamingQueryProgress private[sql] (val json: String) { - // TODO(SPARK-43128): (Implement full object by parsing from json). +import java.{util => ju} +import java.lang.{Long => JLong} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.fasterxml.jackson.annotation.{JsonSetter, Nulls} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Evolving +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue} +import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS + +/** + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Evolving +class StateOperatorProgress private[spark] ( +val operatorName: String, +val numRowsTotal: Long, +val numRowsUpdated: Long, +val allUpdatesTimeMs: Long, +val numRowsRemoved: Long, +val allRemovalsTimeMs: Long, +val commitTimeMs: Long, +val memoryUsedBytes: Long, +val numRowsDroppedByWatermark: Long, +val numShufflePartitions: Long, +val numStateStoreInstances: Long, +val customMetrics: ju.Map[String, JLong] = new ju.HashMap()) +extends Serializable { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + private[sql] def copy( + newNumRowsUpdated: Long, + newNumRowsDroppedByWatermark: Long): StateOperatorProgress = +new StateOperatorProgress( + operatorName = operatorName, + numRowsTotal = numRowsTotal, + numRowsUpdated = newNumRowsUpdated, + allUpdatesTimeMs = allUpdatesTimeMs, + numRowsRemoved = numRowsRemoved, + allRemovalsTimeMs = allRemovalsTimeMs, + commitTimeMs = commitTimeMs, + memoryUsedBytes = memoryUsedBytes, + numRowsDroppedByWatermark = newNumRowsDroppedByWatermark, + numShufflePartitions = numShufflePartitions, + numStateStoreInstances = numStateStoreInstances, + customMetrics = customMetrics) + + private[sql] def jsonValue: JValue = { +("operatorName" -> JString(operatorName)) ~ + ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~ + ("numRowsRemoved" -> JInt(numRowsRemoved)) ~ + ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~ + ("commitTimeMs" -> JInt(commitTimeMs)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ + ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~ + ("numShufflePartitions" -> JInt(numShufflePartitions)) ~ + ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~ + ("customMetrics" -> { +if (!customMetrics.isEmpty) { + val keys = customMetrics.keySet.asScala.toSeq.sorted + keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _) +} else { + JNothing +} + }) + } + + override def toString: String = prettyJson +} + +/** + * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each + * event relates to processing done for a single trigger of the streaming query. Events are + * emitted even when no new data is available to be processed. + * + * @param id + * A unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId + * A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param name + * User-specified name of the query, null if not specified. + * @param timestamp + * Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps. + * @param batchId + * A unique id for the current batch of data being processed. Note that in the case of retries + * after a failure a given batchId my be executed more than once. Similarly, when there is no + * data to be processed, the batchId will not be incremented. + * @param batchDuration + * The process duration of each batch. + * @param durationMs + * The amount of time taken to perform various operations in milliseconds. + * @param eventTime + * Statistics of event time seen in this batch. It may contain the following keys: + * {{{ + * "max" ->
[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scal
LuciferYang commented on code in PR #40892: URL: https://github.com/apache/spark/pull/40892#discussion_r1173598454 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -2182,7 +2182,7 @@ class SparkConnectPlanner(val session: SparkSession) { respBuilder.setRecentProgress( StreamingQueryCommandResult.RecentProgressResult .newBuilder() -.addAllRecentProgressJson(progressReports.map(_.json).asJava) + .addAllRecentProgressJson(progressReports.map(StreamingQueryProgress.jsonString).asJava) Review Comment: The recentProgressJosn should like ``` { "id" : "33ac26f4-1c39-46ce-b798-f3d2a21211d4", "runId" : "849c2c9a-b9f8-446f-9180-259a60fd888c", "name" : "myName", "timestamp" : "2016-12-05T20:54:20.827Z", "batchId" : 2, "batchDuration" : 0, "durationMs" : { "total" : 0 }, ... "observedMetrics" : { "event1" : { "values" : [ 1, 3.0 ], "schema" : { "type" : "struct", "fields" : [ { "name" : "c1", "type" : "long", "nullable" : true, "metadata" : { } }, { "name" : "c2", "type" : "double", "nullable" : true, "metadata" : { } } ] } }, "event2" : { "values" : [ 1, "hello", "world" ], "schema" : { "type" : "struct", "fields" : [ { "name" : "rc", "type" : "long", "nullable" : true, "metadata" : { } }, { "name" : "min_q", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "max_q", "type" : "string", "nullable" : true, "metadata" : { } } ] } } } } ``` then we can rebuild `observedMetrics` on the connect client side -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] peter-toth commented on a diff in pull request #40856: [SPARK-43199][SQL] Make InlineCTE idempotent
peter-toth commented on code in PR #40856: URL: https://github.com/apache/spark/pull/40856#discussion_r1173587974 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala: ## @@ -68,50 +69,91 @@ case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] { cteDef.child.exists(_.expressions.exists(_.isInstanceOf[OuterReference])) } + /** + * Accumulates all the CTEs from a plan into a special map. + * + * @param plan The plan to collect the CTEs from + * @param cteMap A mutable map that accumulates the CTEs and their reference information by CTE + * ids. The value of the map is tuple whose elements are: + * - The CTE definition + * - The number of incoming references to the CTE. This includes references from + * outer CTEs and regular places. Review Comment: fixed in https://github.com/apache/spark/pull/40856/commits/806c2de2b7db9b960c591a849dc43be7f65b468b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org