[jira] [Created] (SPARK-48717) Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled and is running spark queries
Wei Liu created SPARK-48717: --- Summary: Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled and is running spark queries Key: SPARK-48717 URL: https://issues.apache.org/jira/browse/SPARK-48717 Project: Spark Issue Type: New Feature Components: PySpark, SS Affects Versions: 4.0.0 Reporter: Wei Liu Followup of https://issues.apache.org/jira/browse/SPARK-39218 It only considered the InterruptedException is thrown when time.sleep(10) is intercepted. But when a spark query is executing: {code:java} def func(batch_df, batch_id): batch_df.sparkSession.range(1000).write.saveAsTable("oops") print(batch_df.count()) {code} the actual error would be: {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling o2141502.saveAsTable. : org.apache.spark.SparkException: Job aborted. ... at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:262) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:262) *Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1000)* at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1308) {code} We should also add consideration to this scenario -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48676) Structured Logging Framework Scala Style Migration [Part 2]
Amanda Liu created SPARK-48676: -- Summary: Structured Logging Framework Scala Style Migration [Part 2] Key: SPARK-48676 URL: https://issues.apache.org/jira/browse/SPARK-48676 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-48632) Remove unused LogKeys
[ https://issues.apache.org/jira/browse/SPARK-48632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu resolved SPARK-48632. Resolution: Not A Problem > Remove unused LogKeys > - > > Key: SPARK-48632 > URL: https://issues.apache.org/jira/browse/SPARK-48632 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Amanda Liu >Priority: Major > Labels: pull-request-available > > Remove unused LogKey objects to clean up LogKey.scala -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48632) Remove unused LogKeys
Amanda Liu created SPARK-48632: -- Summary: Remove unused LogKeys Key: SPARK-48632 URL: https://issues.apache.org/jira/browse/SPARK-48632 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Remove unused LogKey objects to clean up LogKey.scala -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48628) Add task peak on/off heap execution memory metrics
Ziqi Liu created SPARK-48628: Summary: Add task peak on/off heap execution memory metrics Key: SPARK-48628 URL: https://issues.apache.org/jira/browse/SPARK-48628 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114] metrics, however, the semantic is a bit confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators. We can easily maintain the whole task-level peak memory in TaskMemoryManager, assuming *acquireExecutionMemory* is the only one narrow waist for acquiring execution memory. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48623) Structured Logging Framework Scala Style Migration
Amanda Liu created SPARK-48623: -- Summary: Structured Logging Framework Scala Style Migration Key: SPARK-48623 URL: https://issues.apache.org/jira/browse/SPARK-48623 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48610) Remove ExplainUtils.processPlan synchronize
Ziqi Liu created SPARK-48610: Summary: Remove ExplainUtils.processPlan synchronize Key: SPARK-48610 URL: https://issues.apache.org/jira/browse/SPARK-48610 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu [https://github.com/apache/spark/pull/45282] introduced synchronize to `ExplainUtils.processPlan` to avoid race condition when multiple queries refers to same cached plan. The granularity of lock is too large. We can try to fix the root cause of this concurrency issue by refactoring the usage of mutable OP_ID_TAG, which is not a good practice in terms of immutable nature of SparkPlan. Instead, we can use an auxiliary id map, with object identity as the key. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48592) Add scala style check for logging message inline variables
Amanda Liu created SPARK-48592: -- Summary: Add scala style check for logging message inline variables Key: SPARK-48592 URL: https://issues.apache.org/jira/browse/SPARK-48592 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 4.0.0 Reporter: Amanda Liu Ban logging messages using logInfo, logWarning, logError containing variables without {{MDC}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48589) Add option snapshotStartBatchId and snapshotPartitionId to state data source
Yuchen Liu created SPARK-48589: -- Summary: Add option snapshotStartBatchId and snapshotPartitionId to state data source Key: SPARK-48589 URL: https://issues.apache.org/jira/browse/SPARK-48589 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu Define two new options, _snapshotStartBatchId_ and _snapshotPartitionId_, for the existing state reader. Both of them should be provided at the same time. # When there is no snapshot file at that batch (note there is an off-by-one issue between version and batch Id), throw an exception. # Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards. # Note that if a batchId option is already specified. That batchId is the ending batchId, we should then end at that batchId. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48588) Fine-grained State Data Source
Yuchen Liu created SPARK-48588: -- Summary: Fine-grained State Data Source Key: SPARK-48588 URL: https://issues.apache.org/jira/browse/SPARK-48588 Project: Spark Issue Type: Epic Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu The current state reader API replays the state store rows from the latest snapshot and newer delta files if any. The issue with this mechanism is that sometimes, the snapshot files could be wrongly constructed, or user want to know the change of state across batches. We need to improve the State Reader so that it can handle a variety of fine-grained requirements. For example, reconstruct a state based on arbitrary snapshot; support CDC mode for state evolution. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48567) Pyspark StreamingQuery lastProgress and friend should return actual StreamingQueryProgress
Wei Liu created SPARK-48567: --- Summary: Pyspark StreamingQuery lastProgress and friend should return actual StreamingQueryProgress Key: SPARK-48567 URL: https://issues.apache.org/jira/browse/SPARK-48567 Project: Spark Issue Type: New Feature Components: PySpark, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48542) Give snapshotStartBatchId and snapshotPartitionId to the state data source
Yuchen Liu created SPARK-48542: -- Summary: Give snapshotStartBatchId and snapshotPartitionId to the state data source Key: SPARK-48542 URL: https://issues.apache.org/jira/browse/SPARK-48542 Project: Spark Issue Type: New Feature Components: SQL, Structured Streaming Affects Versions: 4.0.0 Environment: This should work for both HDFS state store and RocksDB state store. Reporter: Yuchen Liu Right now, to read a version of the state data, the state source will try to find the first snapshot file before the given version and construct it using the delta files. In some debugging scenarios, users need more granular control on how to reconstruct the given state, for example they want to start from a specific snapshot instead of the closest one. One use case is to find whether a snapshot has been corrupted after committing. This task introduces two options {{snapshotStartBatchId}} and {{snapshotPartitionId}} to the state data source. By specifying them, users can control the starting batch id of the snapshot and partition id of the state. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48482) dropDuplicates and dropDuplicatesWithinWatermark should accept varargs
Wei Liu created SPARK-48482: --- Summary: dropDuplicates and dropDuplicatesWithinWatermark should accept varargs Key: SPARK-48482 URL: https://issues.apache.org/jira/browse/SPARK-48482 Project: Spark Issue Type: New Feature Components: PySpark Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48480) StreamingQueryListener thread should not be interruptable
Wei Liu created SPARK-48480: --- Summary: StreamingQueryListener thread should not be interruptable Key: SPARK-48480 URL: https://issues.apache.org/jira/browse/SPARK-48480 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48468) Add LogicalQueryStage interface in catalyst
Ziqi Liu created SPARK-48468: Summary: Add LogicalQueryStage interface in catalyst Key: SPARK-48468 URL: https://issues.apache.org/jira/browse/SPARK-48468 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu Add `LogicalQueryStage` interface in catalyst so that it's visible in logical rules -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48465) Avoid no-op empty relation propagation in AQE
Ziqi Liu created SPARK-48465: Summary: Avoid no-op empty relation propagation in AQE Key: SPARK-48465 URL: https://issues.apache.org/jira/browse/SPARK-48465 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 4.0.0 Reporter: Ziqi Liu We should avoid no-op empty relation propagation in AQE: if we convert an empty QueryStageExec to empty relation, it will further wrapped into a new query stage and execute -> produce empty result -> empty relation propagation again. This issue is currently not exposed because AQE will try to reuse shuffle. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48447) Check state store provider class before invoking the constructor
[ https://issues.apache.org/jira/browse/SPARK-48447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48447: --- Priority: Major (was: Minor) > Check state store provider class before invoking the constructor > > > Key: SPARK-48447 > URL: https://issues.apache.org/jira/browse/SPARK-48447 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > We should restrict that only classes > [extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73] > {{StateStoreProvider}} can be constructed to prevent customer from > instantiating arbitrary class of objects. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48447) Check state store provider class before invoking the constructor
Yuchen Liu created SPARK-48447: -- Summary: Check state store provider class before invoking the constructor Key: SPARK-48447 URL: https://issues.apache.org/jira/browse/SPARK-48447 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu We should restrict that only classes [extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73] {{StateStoreProvider}} can be constructed to prevent customer from instantiating arbitrary class of objects. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48446) Update SS Doc of dropDuplicatesWithinWatermark to use the right syntax
Yuchen Liu created SPARK-48446: -- Summary: Update SS Doc of dropDuplicatesWithinWatermark to use the right syntax Key: SPARK-48446 URL: https://issues.apache.org/jira/browse/SPARK-48446 Project: Spark Issue Type: Documentation Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu For dropDuplicates, the example on [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#:~:text=)%20%5C%0A%20%20.-,dropDuplicates,-(%22guid%22] is out of date compared with [https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropDuplicates.html]. The argument should be a list. The discrepancy is also true for dropDuplicatesWithinWatermark. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark
[ https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849356#comment-17849356 ] Yuchen Liu commented on SPARK-48411: I will work on this. > Add E2E test for DropDuplicateWithinWatermark > - > > Key: SPARK-48411 > URL: https://issues.apache.org/jira/browse/SPARK-48411 > Project: Spark > Issue Type: New Feature > Components: Connect, SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Currently we do not have a e2e test for DropDuplicateWithinWatermark, we > should add one. We can simply use one of the test written in Scala here (with > the testStream API) and replicate it to python: > [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103] > > The change should happen in > [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29] > > so we can test it in both connect and non-connect. > > Test with: > ``` > python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming > python/run-tests --testnames > pyspark.sql.tests.connect.streaming.test_parity_streaming > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark
[ https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849348#comment-17849348 ] Wei Liu commented on SPARK-48411: - Sorry I tagged the wrong Yuchen > Add E2E test for DropDuplicateWithinWatermark > - > > Key: SPARK-48411 > URL: https://issues.apache.org/jira/browse/SPARK-48411 > Project: Spark > Issue Type: New Feature > Components: Connect, SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Currently we do not have a e2e test for DropDuplicateWithinWatermark, we > should add one. We can simply use one of the test written in Scala here (with > the testStream API) and replicate it to python: > [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103] > > The change should happen in > [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29] > > so we can test it in both connect and non-connect. > > Test with: > ``` > python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming > python/run-tests --testnames > pyspark.sql.tests.connect.streaming.test_parity_streaming > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark
Wei Liu created SPARK-48411: --- Summary: Add E2E test for DropDuplicateWithinWatermark Key: SPARK-48411 URL: https://issues.apache.org/jira/browse/SPARK-48411 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu Currently we do not have a e2e test for DropDuplicateWithinWatermark, we should add one. We can simply use one of the test written in Scala here (with the testStream API) and replicate it to python: [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103] The change should happen in [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29] so we can test it in both connect and non-connect. Test with: ``` python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming python/run-tests --testnames pyspark.sql.tests.connect.streaming.test_parity_streaming ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark
[ https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849202#comment-17849202 ] Wei Liu commented on SPARK-48411: - [~liuyuchen777] is going to work on this > Add E2E test for DropDuplicateWithinWatermark > - > > Key: SPARK-48411 > URL: https://issues.apache.org/jira/browse/SPARK-48411 > Project: Spark > Issue Type: New Feature > Components: Connect, SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Currently we do not have a e2e test for DropDuplicateWithinWatermark, we > should add one. We can simply use one of the test written in Scala here (with > the testStream API) and replicate it to python: > [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103] > > The change should happen in > [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29] > > so we can test it in both connect and non-connect. > > Test with: > ``` > python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming > python/run-tests --testnames > pyspark.sql.tests.connect.streaming.test_parity_streaming > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-47946) Nested field's nullable value could be invalid after extracted using GetStructField
[ https://issues.apache.org/jira/browse/SPARK-47946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Linhong Liu resolved SPARK-47946. - Resolution: Not A Problem > Nested field's nullable value could be invalid after extracted using > GetStructField > --- > > Key: SPARK-47946 > URL: https://issues.apache.org/jira/browse/SPARK-47946 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 3.4.2 >Reporter: Junyoung Cho >Priority: Major > > I've got error when append to table using DataFrameWriterV2. > The error was occured in TableOutputResolver.checkNullability. This error > occurs when the data type of the schema is the same, but the order of the > fields is different. > I found that GetStructField.nullable returns unexpected result. > {code:java} > override def nullable: Boolean = child.nullable || > childSchema(ordinal).nullable {code} > Even if nested field has not nullability attribute, it returns true when > parent struct has nullability attribute. > ||Parent nullability||Child nullability||Result|| > |true|true|true| > |{color:#ff}true{color}|{color:#ff}false{color}|{color:#ff}true{color}| > |{color:#172b4d}false{color}|{color:#172b4d}true{color}|{color:#172b4d}true{color}| > |false|false|false| > > I think the logic should be changed to get just child's nullability, because > both of parent and child should be nullable to be considered nullable. > > {code:java} > override def nullable: Boolean = childSchema(ordinal).nullable {code} > > > > I want to check current logic is reasonable, or my suggestion can occur other > side effect. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-47946) Nested field's nullable value could be invalid after extracted using GetStructField
[ https://issues.apache.org/jira/browse/SPARK-47946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846773#comment-17846773 ] Linhong Liu commented on SPARK-47946: - No, it's not an issue. think about this ||key||value (nullable=true)|| |a|{"x": 1, "y": 2}| |b|null| |c|{"x": null, "y": 3}| let's assume `value.y` cannot be null (e.g. nullable = false), and run `select value.y from tbl`, what's the result? and what's the nullability of this column? it should be ||y|| |2| |null| |2| > Nested field's nullable value could be invalid after extracted using > GetStructField > --- > > Key: SPARK-47946 > URL: https://issues.apache.org/jira/browse/SPARK-47946 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 3.4.2 >Reporter: Junyoung Cho >Priority: Major > > I've got error when append to table using DataFrameWriterV2. > The error was occured in TableOutputResolver.checkNullability. This error > occurs when the data type of the schema is the same, but the order of the > fields is different. > I found that GetStructField.nullable returns unexpected result. > {code:java} > override def nullable: Boolean = child.nullable || > childSchema(ordinal).nullable {code} > Even if nested field has not nullability attribute, it returns true when > parent struct has nullability attribute. > ||Parent nullability||Child nullability||Result|| > |true|true|true| > |{color:#ff}true{color}|{color:#ff}false{color}|{color:#ff}true{color}| > |{color:#172b4d}false{color}|{color:#172b4d}true{color}|{color:#172b4d}true{color}| > |false|false|false| > > I think the logic should be changed to get just child's nullability, because > both of parent and child should be nullable to be considered nullable. > > {code:java} > override def nullable: Boolean = childSchema(ordinal).nullable {code} > > > > I want to check current logic is reasonable, or my suggestion can occur other > side effect. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48241) CSV parsing failure with char/varchar type columns
Jiayi Liu created SPARK-48241: - Summary: CSV parsing failure with char/varchar type columns Key: SPARK-48241 URL: https://issues.apache.org/jira/browse/SPARK-48241 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Jiayi Liu Fix For: 4.0.0 CSV table containing char and varchar columns will result in the following error when selecting from the CSV table: {code:java} java.lang.IllegalArgumentException: requirement failed: requiredSchema (struct) should be the subset of dataSchema (struct). at scala.Predef$.require(Predef.scala:281) at org.apache.spark.sql.catalyst.csv.UnivocityParser.(UnivocityParser.scala:56) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:127) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125){code} The reason for the error is that the StringType columns in the dataSchema and requiredSchema of UnivocityParser are not consistent. It is due to the metadata contained in the StringType StructField of the dataSchema, which is missing in the requiredSchema. We need to retain the metadata when resolving schema. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48181) Unify StreamingPythonRunner and PythonPlannerRunner
Wei Liu created SPARK-48181: --- Summary: Unify StreamingPythonRunner and PythonPlannerRunner Key: SPARK-48181 URL: https://issues.apache.org/jira/browse/SPARK-48181 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu We should unify the two driver side python runner for PySpark. To do this we should move out of StreamingPythonRunner and enhance PythonPlannerRunner with streaming support (multiple read - write loop) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48147) Remove all client listeners when local spark session is deleted
Wei Liu created SPARK-48147: --- Summary: Remove all client listeners when local spark session is deleted Key: SPARK-48147 URL: https://issues.apache.org/jira/browse/SPARK-48147 Project: Spark Issue Type: New Feature Components: Connect, PySpark, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48093) Add config to switch between client side listener and server side listener
Wei Liu created SPARK-48093: --- Summary: Add config to switch between client side listener and server side listener Key: SPARK-48093 URL: https://issues.apache.org/jira/browse/SPARK-48093 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 3.5.1, 3.5.0, 3.5.2 Reporter: Wei Liu We are moving the implementation of Streaming Query Listener from server to client. For clients already running client side listener, to prevent regression, we should add a config to let them decide what type of listener the user want to use. This is only added to 3.5.x published versions. For 4.0 and upwards we only use client side listener. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48002) Add Observed metrics test in PySpark StreamingQueryListeners
Wei Liu created SPARK-48002: --- Summary: Add Observed metrics test in PySpark StreamingQueryListeners Key: SPARK-48002 URL: https://issues.apache.org/jira/browse/SPARK-48002 Project: Spark Issue Type: New Feature Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47877) Speed up test_parity_listener
Wei Liu created SPARK-47877: --- Summary: Speed up test_parity_listener Key: SPARK-47877 URL: https://issues.apache.org/jira/browse/SPARK-47877 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47718) .sql() does not recognize watermark defined upstream
[ https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-47718: Labels: (was: pull-request-available) > .sql() does not recognize watermark defined upstream > > > Key: SPARK-47718 > URL: https://issues.apache.org/jira/browse/SPARK-47718 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.5.1 >Reporter: Chloe He >Priority: Major > > I have a data pipeline set up in such a way that it reads data from a Kafka > source, does some transformation on the data using pyspark, then writes the > output into a sink (Kafka, Redis, etc). > > My entire pipeline in written in SQL, so I wish to use the .sql() method to > execute SQL on my streaming source directly. > > However, I'm running into the issue where my watermark is not being > recognized by the downstream query via the .sql() method. > > ``` > Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) > [Clang 16.0.6 ] on darwin > Type "help", "copyright", "credits" or "license" for more information. > >>> import pyspark > >>> print(pyspark.__version__) > 3.5.1 > >>> from pyspark.sql import SparkSession > >>> > >>> session = SparkSession.builder \ > ... .config("spark.jars.packages", > "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\ > ... .getOrCreate() > >>> from pyspark.sql.functions import col, from_json > >>> from pyspark.sql.types import StructField, StructType, TimestampType, > >>> LongType, DoubleType, IntegerType > >>> schema = StructType( > ... [ > ... StructField('createTime', TimestampType(), True), > ... StructField('orderId', LongType(), True), > ... StructField('payAmount', DoubleType(), True), > ... StructField('payPlatform', IntegerType(), True), > ... StructField('provinceId', IntegerType(), True), > ... ]) > >>> > >>> streaming_df = session.readStream\ > ... .format("kafka")\ > ... .option("kafka.bootstrap.servers", "localhost:9092")\ > ... .option("subscribe", "payment_msg")\ > ... .option("startingOffsets","earliest")\ > ... .load()\ > ... .select(from_json(col("value").cast("string"), > schema).alias("parsed_value"))\ > ... .select("parsed_value.*")\ > ... .withWatermark("createTime", "10 seconds") > >>> > >>> streaming_df.createOrReplaceTempView("streaming_df") > >>> session.sql(""" > ... SELECT > ... window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > ... FROM streaming_df > ... GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > ... ORDER BY window.start > ... """)\ > ... .writeStream\ > ... .format("kafka") \ > ... .option("checkpointLocation", "checkpoint") \ > ... .option("kafka.bootstrap.servers", "localhost:9092") \ > ... .option("topic", "sink") \ > ... .start() > ``` > > This throws exception > ``` > pyspark.errors.exceptions.captured.AnalysisException: Append output mode not > supported when there are streaming aggregations on streaming > DataFrames/DataSets without watermark; line 6 pos 4; > ``` > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47722) Wait until RocksDB background work finish before closing
Wei Liu created SPARK-47722: --- Summary: Wait until RocksDB background work finish before closing Key: SPARK-47722 URL: https://issues.apache.org/jira/browse/SPARK-47722 Project: Spark Issue Type: New Feature Components: SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I create a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color} {dateFormat.format(timestampValue)} ,'-MM-dd HH:mi:ss')" After this modification, it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')" After this
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')" After this modification, it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{\{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')" After
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{\{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')" After this modification, it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')"{color} After this modification, it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will never hit the index. In my case, as a workaround, I changed the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color} { {color:#9876aa} dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')"{color} then it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index. In my case, as a work around, I just change the code to this: {color:#cc7832}case {color}timestampValue: Timestamp
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: The update_time column in Oracle is of type DATE, which is mapped to Timestamp in Spark (because the precision of DATE in Oracle is second). When I push a filter to Oracle, it triggers the following code in org.apache.spark.sql.jdbc.OracleDialect: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: {d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index. In my case, as a work around, I just change the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color} {\\{color:#9876aa} dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss'){color:#6a8759}"{color} then it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: the update_time column in oracle is DATE type, this mapped to spark has became Timestamp(because precision of DATE in oracle is second), and when I pushed a filter to oracle, it will hit the codes bellow: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: \\{d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index. In my case, as a work around, I just change the code to this: {color:#cc7832}case {color}timestampValue: Timestamp
[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
[ https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danke Liu updated SPARK-47542: -- Description: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that reads from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I apply a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: the update_time column in oracle is DATE type, this mapped to spark has became Timestamp(because precision of DATE in oracle is second), and when I pushed a filter to oracle, it will hit the codes bellow: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: \\{d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index. In my case, as a work around, I just change the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{\{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss'){color:#6a8759}"{color} then it worked well. was: When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that read from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I pushed a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: the update_time column in oracle is DATE type, this mapped to spark has became Timestamp(because precision of DATE in oracle is second), and when I pushed a filter to oracle, it will hit the codes bellow: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: \{d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index. In my case, as a work around, I just change the code to this: {color:#cc7832}case {color}timestampValue: Timestamp
[jira] [Created] (SPARK-47542) spark cannot hit oracle's index when column type is DATE
Danke Liu created SPARK-47542: - Summary: spark cannot hit oracle's index when column type is DATE Key: SPARK-47542 URL: https://issues.apache.org/jira/browse/SPARK-47542 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.4 Reporter: Danke Liu When I use spark's jdbc to pull data from oracle, it will not hit the index if the pushed filter's type in oralce is DATE. Here is my scenario: first I created a dataframe that read from oracle: val df = spark.read.format("jdbc"). option("url", url). option("driver", driver). option("user", user). option("password", passwd). option("dbtable", "select * from foobar.tbl1") .load() then I pushed a filter to the dataframe like this: df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd HH:mm:ss') """).count() this will not hit the index on update_time column. Reason: the update_time column in oracle is DATE type, this mapped to spark has became Timestamp(because precision of DATE in oracle is second), and when I pushed a filter to oracle, it will hit the codes bellow: // class is org.apache.spark.sql.jdbc.OracleDialect override def compileValue(value: Any): Any = value match { // The JDBC drivers support date literals in SQL statements written in the // format: \{d '-mm-dd'} and timestamp literals in SQL statements written // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 (11.1)' // Appendix A Reference Information. case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}" case dateValue: Date => "\{d '" + dateValue + "'}" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index. In my case, as a work around, I just change the code to this: {color:#cc7832}case {color}timestampValue: Timestamp =>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{{color:#9876aa}dateFormat{color}.format(timestampValue)}{color:#6a8759},'-MM-dd HH:mi:ss'){color}{color:#6a8759}"{color} then it worked well. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47332) StreamingPythonRunner don't need redundant logic for starting python process
Wei Liu created SPARK-47332: --- Summary: StreamingPythonRunner don't need redundant logic for starting python process Key: SPARK-47332 URL: https://issues.apache.org/jira/browse/SPARK-47332 Project: Spark Issue Type: New Feature Components: Connect, SS, Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu https://github.com/apache/spark/pull/45023#discussion_r1516609093 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47292) safeMapToJValue should consider when map is null
Wei Liu created SPARK-47292: --- Summary: safeMapToJValue should consider when map is null Key: SPARK-47292 URL: https://issues.apache.org/jira/browse/SPARK-47292 Project: Spark Issue Type: New Feature Components: Connect, SS Affects Versions: 3.5.1, 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47277) PySpark util function assertDataFrameEqual should not support streaming DF
Wei Liu created SPARK-47277: --- Summary: PySpark util function assertDataFrameEqual should not support streaming DF Key: SPARK-47277 URL: https://issues.apache.org/jira/browse/SPARK-47277 Project: Spark Issue Type: New Feature Components: Connect, PySpark, SQL, Structured Streaming Affects Versions: 3.5.1, 3.5.0, 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet * maybe we just a coarse granularity lock in explain? * make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.filter("key > 10") df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Filter (isnotnull(key#4L) AND (key#4L > 10)) +- TableCacheQueryStage 0 +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)] +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == Filter (isnotnull(key#4L) AND (key#4L > 10)) +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 10)] +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10){code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet * maybe we just a coarse granularity lock in explain? * make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet * maybe we just a coarse granularity lock in explain? * make innerChildren a function: clone the initial plan, every time checked for whether the original AQE plan is finalized (making the final flag atomic first, of course), if no return the cloned initial plan, if it's finalized, clone the final plan and return that one. But still this won't be able to reflect the AQE plan in real time, in a concurrent situation, but at least we have initial version and final version. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet, maybe we just a coarse granularity lock in explain? A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +-
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. *In short, the plan used to executed and the plan used to explain is not the same instance, thus causing the inconsistency.* I don't have a clear idea how yet, maybe we just a coarse granularity lock in explain? A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33]
[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
[ https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-47177: - Description: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} was: AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, count(key2)=10), Row(key2=8, count(key2)=10)] >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L],
[jira] [Created] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string
Ziqi Liu created SPARK-47177: Summary: Cached SQL plan do not display final AQE plan in explain string Key: SPARK-47177 URL: https://issues.apache.org/jira/browse/SPARK-47177 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.1, 3.5.0, 3.4.2, 4.0.0, 3.5.2 Reporter: Ziqi Liu AQE plan is expected to display final plan after execution. This is not true for cached SQL plan: it will show the initial plan instead. This behavior change is introduced in [https://github.com/apache/spark/pull/40812] it tried to fix the concurrency issue with cached plan. I don't have a clear idea how yet, maybe we can check whether the AQE plan is finalized(make the final flag atomic first, of course), if not we can return the cloned one, otherwise it's thread-safe to return the final one, since it's immutable. A simple repro: {code:java} d1 = spark.range(1000).withColumn("key", expr("id % 100")).groupBy("key").agg({"key": "count"}) cached_d2 = d1.cache() df = cached_d2.withColumn("key2", expr("key % 10")).groupBy("key2").agg({"key2": "count"}) df.collect() {code} {code:java} Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, count(key2)=10), Row(key2=8, count(key2)=10)] >>> df.explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=83] +- *(1) HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- *(1) Project [(key#27L % 10) AS key2#36L] +- TableCacheQueryStage 0 +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) +- == Initial Plan == HashAggregate(keys=[key2#36L], functions=[count(key2#36L)]) +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)]) +- Project [(key#27L % 10) AS key2#36L] +- InMemoryTableScan [key#27L] +- InMemoryRelation [key#27L, count(key)#33L], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[key#4L], functions=[count(key#4L)]) +- Exchange hashpartitioning(key#4L, 200), ENSURE_REQUIREMENTS, [plan_id=33] +- HashAggregate(keys=[key#4L], functions=[partial_count(key#4L)]) +- Project [(id#2L % 100) AS key#4L] +- Range (0, 1000, step=1, splits=10) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47174) Client Side Listener - Server side implementation
Wei Liu created SPARK-47174: --- Summary: Client Side Listener - Server side implementation Key: SPARK-47174 URL: https://issues.apache.org/jira/browse/SPARK-47174 Project: Spark Issue Type: Improvement Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-47174) Client Side Listener - Server side implementation
[ https://issues.apache.org/jira/browse/SPARK-47174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820828#comment-17820828 ] Wei Liu commented on SPARK-47174: - im working on this > Client Side Listener - Server side implementation > - > > Key: SPARK-47174 > URL: https://issues.apache.org/jira/browse/SPARK-47174 > Project: Spark > Issue Type: Improvement > Components: Connect, SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47173) fix typo in new streaming query listener explanation
Wei Liu created SPARK-47173: --- Summary: fix typo in new streaming query listener explanation Key: SPARK-47173 URL: https://issues.apache.org/jira/browse/SPARK-47173 Project: Spark Issue Type: Improvement Components: SS, UI Affects Versions: 4.0.0 Reporter: Wei Liu miss spelled flatMapGroupsWithState with flatMapGroupWithState (missed a "s" after group) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan
[ https://issues.apache.org/jira/browse/SPARK-46995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ziqi Liu updated SPARK-46995: - Component/s: SQL > Allow AQE coalesce final stage in SQL cached plan > - > > Key: SPARK-46995 > URL: https://issues.apache.org/jira/browse/SPARK-46995 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 4.0.0 >Reporter: Ziqi Liu >Priority: Major > > [https://github.com/apache/spark/pull/43435] and > [https://github.com/apache/spark/pull/43760] are fixing a correctness issue > which will be triggered when AQE applied on cached query plan, specifically, > when AQE coalescing the final result stage of the cached plan. > > The current semantic of > {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} > ([source > code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]): > * when true, we enable AQE, but disable coalescing final stage > ({*}default{*}) > * when false, we disable AQE > > But let’s revisit the semantic of this config: actually for caller the only > thing that matters is whether we change the output partitioning of the cached > plan. And we should only try to apply AQE if possible. Thus we want to > modify the semantic of > {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} > * when true, we enable AQE and allow coalescing final: this might lead to > perf regression, because it introduce extra shuffle > * when false, we enable AQE, but disable coalescing final stage. *(this is > actually the `true` semantic of old behavior)* > Also, to keep the default behavior unchanged, we might want to flip the > default value of > {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to `false` > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan
Ziqi Liu created SPARK-46995: Summary: Allow AQE coalesce final stage in SQL cached plan Key: SPARK-46995 URL: https://issues.apache.org/jira/browse/SPARK-46995 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Ziqi Liu [https://github.com/apache/spark/pull/43435] and [https://github.com/apache/spark/pull/43760] are fixing a correctness issue which will be triggered when AQE applied on cached query plan, specifically, when AQE coalescing the final result stage of the cached plan. The current semantic of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} ([source code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]): * when true, we enable AQE, but disable coalescing final stage ({*}default{*}) * when false, we disable AQE But let’s revisit the semantic of this config: actually for caller the only thing that matters is whether we change the output partitioning of the cached plan. And we should only try to apply AQE if possible. Thus we want to modify the semantic of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} * when true, we enable AQE and allow coalescing final: this might lead to perf regression, because it introduce extra shuffle * when false, we enable AQE, but disable coalescing final stage. *(this is actually the `true` semantic of old behavior)* Also, to keep the default behavior unchanged, we might want to flip the default value of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to `false` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46910) Eliminate JDK Requirement in PySpark Installation
Amanda Liu created SPARK-46910: -- Summary: Eliminate JDK Requirement in PySpark Installation Key: SPARK-46910 URL: https://issues.apache.org/jira/browse/SPARK-46910 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 3.5.0 Reporter: Amanda Liu PySpark requires users to have the correct JDK version (JDK 8+ for Spark<4; JDK 17+ for Spark>=4) installed locally. We can make the Spark installation script install the JDK, so users don’t need to do this step manually. h1. Details # When the entry point for a Spark class is invoked, the spark-class script checks if Java is installed in the user environment. # If Java is not installed, the user is prompted to select whether they want to install JDK 17. # If the user selects yes, JDK 17 is installed (using the [install-jdk library|https://pypi.org/project/install-jdk/]) and JAVA_HOME variable and RUNNER are set appropriately. The Spark build will now work! # If the user selects no, we provide them a brief description of how to install JDK manually. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46873) PySpark spark.streams should not recreate new StreamingQueryManager
Wei Liu created SPARK-46873: --- Summary: PySpark spark.streams should not recreate new StreamingQueryManager Key: SPARK-46873 URL: https://issues.apache.org/jira/browse/SPARK-46873 Project: Spark Issue Type: Task Components: Connect, PySpark, SS Affects Versions: 4.0.0 Reporter: Wei Liu In Scala, there is only one streaming query manager for one spark session: ``` scala> spark.streams val *res0*: *org.apache.spark.sql.streaming.StreamingQueryManager* = org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba scala> spark.streams val *res1*: *org.apache.spark.sql.streaming.StreamingQueryManager* = org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba scala> spark.streams val *res2*: *org.apache.spark.sql.streaming.StreamingQueryManager* = org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba scala> spark.streams val *res3*: *org.apache.spark.sql.streaming.StreamingQueryManager* = org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba ``` In Python, this is currently false: ``` >>> spark.streams >>> spark.streams >>> spark.streams >>> spark.streams ``` Python should align scala behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-44460) Pass user auth credential to Python workers for foreachBatch and listener
[ https://issues.apache.org/jira/browse/SPARK-44460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu closed SPARK-44460. --- > Pass user auth credential to Python workers for foreachBatch and listener > - > > Key: SPARK-44460 > URL: https://issues.apache.org/jira/browse/SPARK-44460 > Project: Spark > Issue Type: Task > Components: Connect, Structured Streaming >Affects Versions: 3.4.1 >Reporter: Raghu Angadi >Priority: Major > > No user specific credentials are sent to Python worker that runs user > functions like foreachBatch() and streaming listener. > We might need to pass in these. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46627) Streaming UI hover-over shows incorrect value
[ https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-46627: Attachment: Screenshot 2024-01-08 at 15.06.24.png > Streaming UI hover-over shows incorrect value > - > > Key: SPARK-46627 > URL: https://issues.apache.org/jira/browse/SPARK-46627 > Project: Spark > Issue Type: Task > Components: Structured Streaming, UI, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png, Screenshot > 2024-01-08 at 15.06.24.png > > > Running a simple streaming query: > val df = spark.readStream.format("rate").option("rowsPerSecond", > "5000").load() > val q = df.writeStream.format("noop").start() > > The hover-over value is incorrect in the streaming ui (shows 321.00 at > undefined) > > !Screenshot 2024-01-08 at 1.55.57 PM.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46627) Streaming UI hover-over shows incorrect value
[ https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804513#comment-17804513 ] Wei Liu commented on SPARK-46627: - Also batch percent doesn't add to 100% now: !Screenshot 2024-01-08 at 15.06.24.png! > Streaming UI hover-over shows incorrect value > - > > Key: SPARK-46627 > URL: https://issues.apache.org/jira/browse/SPARK-46627 > Project: Spark > Issue Type: Task > Components: Structured Streaming, UI, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png, Screenshot > 2024-01-08 at 15.06.24.png > > > Running a simple streaming query: > val df = spark.readStream.format("rate").option("rowsPerSecond", > "5000").load() > val q = df.writeStream.format("noop").start() > > The hover-over value is incorrect in the streaming ui (shows 321.00 at > undefined) > > !Screenshot 2024-01-08 at 1.55.57 PM.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46627) Streaming UI hover-over shows incorrect value
[ https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-46627: Attachment: Screenshot 2024-01-08 at 1.55.57 PM.png > Streaming UI hover-over shows incorrect value > - > > Key: SPARK-46627 > URL: https://issues.apache.org/jira/browse/SPARK-46627 > Project: Spark > Issue Type: Task > Components: Structured Streaming, UI, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png > > > Running a simple streaming query: > val df = spark.readStream.format("rate").option("rowsPerSecond", > "5000").load() > val q = df.writeStream.format("noop").start() > > The hover-over value is incorrect in the streaming ui: > > !https://files.slack.com/files-tmb/T02727P8HV4-F06CJ83D3JT-b44210f391/image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46627) Streaming UI hover-over shows incorrect value
[ https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804481#comment-17804481 ] Wei Liu commented on SPARK-46627: - Hi Kent : ) [~yao] I was wondering if you have context in this issue? Thank you so much! > Streaming UI hover-over shows incorrect value > - > > Key: SPARK-46627 > URL: https://issues.apache.org/jira/browse/SPARK-46627 > Project: Spark > Issue Type: Task > Components: Structured Streaming, UI, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png > > > Running a simple streaming query: > val df = spark.readStream.format("rate").option("rowsPerSecond", > "5000").load() > val q = df.writeStream.format("noop").start() > > The hover-over value is incorrect in the streaming ui (shows 321.00 at > undefined) > > !Screenshot 2024-01-08 at 1.55.57 PM.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46627) Streaming UI hover-over shows incorrect value
[ https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-46627: Description: Running a simple streaming query: val df = spark.readStream.format("rate").option("rowsPerSecond", "5000").load() val q = df.writeStream.format("noop").start() The hover-over value is incorrect in the streaming ui (shows 321.00 at undefined) !Screenshot 2024-01-08 at 1.55.57 PM.png! was: Running a simple streaming query: val df = spark.readStream.format("rate").option("rowsPerSecond", "5000").load() val q = df.writeStream.format("noop").start() The hover-over value is incorrect in the streaming ui: !https://files.slack.com/files-tmb/T02727P8HV4-F06CJ83D3JT-b44210f391/image_720.png! > Streaming UI hover-over shows incorrect value > - > > Key: SPARK-46627 > URL: https://issues.apache.org/jira/browse/SPARK-46627 > Project: Spark > Issue Type: Task > Components: Structured Streaming, UI, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png > > > Running a simple streaming query: > val df = spark.readStream.format("rate").option("rowsPerSecond", > "5000").load() > val q = df.writeStream.format("noop").start() > > The hover-over value is incorrect in the streaming ui (shows 321.00 at > undefined) > > !Screenshot 2024-01-08 at 1.55.57 PM.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46627) Streaming UI hover-over shows incorrect value
Wei Liu created SPARK-46627: --- Summary: Streaming UI hover-over shows incorrect value Key: SPARK-46627 URL: https://issues.apache.org/jira/browse/SPARK-46627 Project: Spark Issue Type: Task Components: Structured Streaming, UI, Web UI Affects Versions: 4.0.0 Reporter: Wei Liu Running a simple streaming query: val df = spark.readStream.format("rate").option("rowsPerSecond", "5000").load() val q = df.writeStream.format("noop").start() The hover-over value is incorrect in the streaming ui: !https://files.slack.com/files-tmb/T02727P8HV4-F06CJ83D3JT-b44210f391/image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46384) Streaming UI doesn't display graph correctly
[ https://issues.apache.org/jira/browse/SPARK-46384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-46384: Summary: Streaming UI doesn't display graph correctly (was: Streaming UI doesn't show graph) > Streaming UI doesn't display graph correctly > > > Key: SPARK-46384 > URL: https://issues.apache.org/jira/browse/SPARK-46384 > Project: Spark > Issue Type: Task > Components: Structured Streaming, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > The Streaming UI is broken currently at spark master. Running a simple query: > ``` > q = > spark.readStream.format("rate").load().writeStream.format("memory").queryName("test_wei").start() > ``` > Would make the spark UI shows empty graph for "operation duration": > !https://private-user-images.githubusercontent.com/10248890/289990561-fdb78c92-2d6f-41a9-ba23-3068d128caa8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjEtZmRiNzhjOTItMmQ2Zi00MWE5LWJhMjMtMzA2OGQxMjhjYWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4N2FkZDYyZGQwZGZmNWJhN2IzMTM3ZmI1MzNhOGExZGY2MThjZjMwZDU5MzZiOTI4ZGVkMjc3MjBhMTNhZjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KXhI0NnwIpfTRjVcsXuA82AnaURHgtkLOYVzifI-mp8! > Here is the error: > !https://private-user-images.githubusercontent.com/10248890/289990953-cd477d48-a45e-4ee9-b45e-06dc1dbeb9d9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA5NTMtY2Q0NzdkNDgtYTQ1ZS00ZWU5LWI0NWUtMDZkYzFkYmViOWQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI2MzIyYjQ5OWQ3YWZlMGYzYjFmYTljZjIwZjBmZDBiNzQyZmE3OTI2ZjkxNzVhNWU0ZDAwYTA4NDRkMTRjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JAqEG_4NCEiRvHO6yv59hZdkH_5_tSUuaOkpEbH-I20! > > I verified the same query runs fine on spark 3.5, as in the following graph. > !https://private-user-images.githubusercontent.com/10248890/289990563-642aa6c3-7728-43c7-8a11-cbf79c4362c5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjMtNjQyYWE2YzMtNzcyOC00M2M3LThhMTEtY2JmNzljNDM2MmM1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWM0YmZiYTkyNGZkNzBkOGMzMmUyYTYzZTMyYzQ1ZTZkNDU3MDk2M2ZlOGNlZmQxNGYzNTFjZWRiNTQ2ZmQzZWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MJ8-78Qv5KkoLNGBrLXS-8gcC7LZepFsOD4r7pcnzSI! > > This should be a problem from the library updates, this could be a potential > source of error: [https://github.com/apache/spark/pull/42879] > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46384) Structured Streaming UI doesn't display graph correctly
[ https://issues.apache.org/jira/browse/SPARK-46384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-46384: Summary: Structured Streaming UI doesn't display graph correctly (was: Streaming UI doesn't display graph correctly) > Structured Streaming UI doesn't display graph correctly > --- > > Key: SPARK-46384 > URL: https://issues.apache.org/jira/browse/SPARK-46384 > Project: Spark > Issue Type: Task > Components: Structured Streaming, Web UI >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > The Streaming UI is broken currently at spark master. Running a simple query: > ``` > q = > spark.readStream.format("rate").load().writeStream.format("memory").queryName("test_wei").start() > ``` > Would make the spark UI shows empty graph for "operation duration": > !https://private-user-images.githubusercontent.com/10248890/289990561-fdb78c92-2d6f-41a9-ba23-3068d128caa8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjEtZmRiNzhjOTItMmQ2Zi00MWE5LWJhMjMtMzA2OGQxMjhjYWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4N2FkZDYyZGQwZGZmNWJhN2IzMTM3ZmI1MzNhOGExZGY2MThjZjMwZDU5MzZiOTI4ZGVkMjc3MjBhMTNhZjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KXhI0NnwIpfTRjVcsXuA82AnaURHgtkLOYVzifI-mp8! > Here is the error: > !https://private-user-images.githubusercontent.com/10248890/289990953-cd477d48-a45e-4ee9-b45e-06dc1dbeb9d9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA5NTMtY2Q0NzdkNDgtYTQ1ZS00ZWU5LWI0NWUtMDZkYzFkYmViOWQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI2MzIyYjQ5OWQ3YWZlMGYzYjFmYTljZjIwZjBmZDBiNzQyZmE3OTI2ZjkxNzVhNWU0ZDAwYTA4NDRkMTRjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JAqEG_4NCEiRvHO6yv59hZdkH_5_tSUuaOkpEbH-I20! > > I verified the same query runs fine on spark 3.5, as in the following graph. > !https://private-user-images.githubusercontent.com/10248890/289990563-642aa6c3-7728-43c7-8a11-cbf79c4362c5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjMtNjQyYWE2YzMtNzcyOC00M2M3LThhMTEtY2JmNzljNDM2MmM1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWM0YmZiYTkyNGZkNzBkOGMzMmUyYTYzZTMyYzQ1ZTZkNDU3MDk2M2ZlOGNlZmQxNGYzNTFjZWRiNTQ2ZmQzZWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MJ8-78Qv5KkoLNGBrLXS-8gcC7LZepFsOD4r7pcnzSI! > > This should be a problem from the library updates, this could be a potential > source of error: [https://github.com/apache/spark/pull/42879] > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46384) Streaming UI doesn't show graph
Wei Liu created SPARK-46384: --- Summary: Streaming UI doesn't show graph Key: SPARK-46384 URL: https://issues.apache.org/jira/browse/SPARK-46384 Project: Spark Issue Type: Task Components: Structured Streaming, Web UI Affects Versions: 4.0.0 Reporter: Wei Liu The Streaming UI is broken currently at spark master. Running a simple query: ``` q = spark.readStream.format("rate").load().writeStream.format("memory").queryName("test_wei").start() ``` Would make the spark UI shows empty graph for "operation duration": !https://private-user-images.githubusercontent.com/10248890/289990561-fdb78c92-2d6f-41a9-ba23-3068d128caa8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjEtZmRiNzhjOTItMmQ2Zi00MWE5LWJhMjMtMzA2OGQxMjhjYWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4N2FkZDYyZGQwZGZmNWJhN2IzMTM3ZmI1MzNhOGExZGY2MThjZjMwZDU5MzZiOTI4ZGVkMjc3MjBhMTNhZjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KXhI0NnwIpfTRjVcsXuA82AnaURHgtkLOYVzifI-mp8! Here is the error: !https://private-user-images.githubusercontent.com/10248890/289990953-cd477d48-a45e-4ee9-b45e-06dc1dbeb9d9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA5NTMtY2Q0NzdkNDgtYTQ1ZS00ZWU5LWI0NWUtMDZkYzFkYmViOWQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI2MzIyYjQ5OWQ3YWZlMGYzYjFmYTljZjIwZjBmZDBiNzQyZmE3OTI2ZjkxNzVhNWU0ZDAwYTA4NDRkMTRjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JAqEG_4NCEiRvHO6yv59hZdkH_5_tSUuaOkpEbH-I20! I verified the same query runs fine on spark 3.5, as in the following graph. !https://private-user-images.githubusercontent.com/10248890/289990563-642aa6c3-7728-43c7-8a11-cbf79c4362c5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjMtNjQyYWE2YzMtNzcyOC00M2M3LThhMTEtY2JmNzljNDM2MmM1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWM0YmZiYTkyNGZkNzBkOGMzMmUyYTYzZTMyYzQ1ZTZkNDU3MDk2M2ZlOGNlZmQxNGYzNTFjZWRiNTQ2ZmQzZWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MJ8-78Qv5KkoLNGBrLXS-8gcC7LZepFsOD4r7pcnzSI! This should be a problem from the library updates, this could be a potential source of error: [https://github.com/apache/spark/pull/42879] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46279) Support write partition values to data files
fred liu created SPARK-46279: Summary: Support write partition values to data files Key: SPARK-46279 URL: https://issues.apache.org/jira/browse/SPARK-46279 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: fred liu Support write partition values to data files would give the flexibility to allow parquet files to be read correctly without relying on engine to read partition values from the path, and enables cases where individual parquet files can be copied and shared -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46250) Deflake test_parity_listener
Wei Liu created SPARK-46250: --- Summary: Deflake test_parity_listener Key: SPARK-46250 URL: https://issues.apache.org/jira/browse/SPARK-46250 Project: Spark Issue Type: Task Components: Connect, SS Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45845) Streaming UI add number of evicted state rows
Wei Liu created SPARK-45845: --- Summary: Streaming UI add number of evicted state rows Key: SPARK-45845 URL: https://issues.apache.org/jira/browse/SPARK-45845 Project: Spark Issue Type: Task Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu The UI is missing this chart, and people always confuse "aggregated number of rows dropped by watermark" with this newly added metric -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45834) Fix Pearson correlation calculation more stable
[ https://issues.apache.org/jira/browse/SPARK-45834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayi Liu updated SPARK-45834: -- Description: Spark uses the formula {{ck / sqrt(xMk * yMk)}} to calculate the Pearson Correlation Coefficient. If {{xMk}} and {{yMk}} are very small, it can lead to double multiplication overflow, resulting in a denominator of 0. This leads to an Infinity result in the calculation. For example, when calculating the correlation for the same columns a and b in a table, the result will be Infinity, but the correlation for identical columns should be 1.0 instead. ||a||b|| |1e-200|1e-200| |1e-200|1e-200| |1e-100|1e-100| Modifying the formula to {{ck / sqrt(xMk) / sqrt(yMk)}} can indeed solve this issue and improve the stability of the calculation. The benefit of this modification is that it splits the square root of the denominator into two parts: {{sqrt(xMk)}} and {{{}sqrt(yMk){}}}. This helps avoid multiplication overflow or cases where the product of extremely small values becomes zero. was: Spark uses the formula {{ck / sqrt(xMk * yMk)}} to calculate the Pearson Correlation Coefficient. If {{xMk}} and {{yMk}} are very small, it can lead to double multiplication overflow, resulting in a denominator of 0. This leads to a NaN result in the calculation. For example, when calculating the correlation for the same columns a and b in a table, the result will be Infinity, but the correlation for identical columns should be 1.0 instead. ||a||b|| |1e-200|1e-200| |1e-200|1e-200| |1e-100|1e-100| Modifying the formula to {{ck / sqrt(xMk) / sqrt(yMk)}} can indeed solve this issue and improve the stability of the calculation. The benefit of this modification is that it splits the square root of the denominator into two parts: {{sqrt(xMk)}} and {{{}sqrt(yMk){}}}. This helps avoid multiplication overflow or cases where the product of extremely small values becomes zero. > Fix Pearson correlation calculation more stable > --- > > Key: SPARK-45834 > URL: https://issues.apache.org/jira/browse/SPARK-45834 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Jiayi Liu >Priority: Major > > Spark uses the formula {{ck / sqrt(xMk * yMk)}} to calculate the Pearson > Correlation Coefficient. If {{xMk}} and {{yMk}} are very small, it can lead > to double multiplication overflow, resulting in a denominator of 0. This > leads to an Infinity result in the calculation. > For example, when calculating the correlation for the same columns a and b in > a table, the result will be Infinity, but the correlation for identical > columns should be 1.0 instead. > ||a||b|| > |1e-200|1e-200| > |1e-200|1e-200| > |1e-100|1e-100| > Modifying the formula to {{ck / sqrt(xMk) / sqrt(yMk)}} can indeed solve this > issue and improve the stability of the calculation. The benefit of this > modification is that it splits the square root of the denominator into two > parts: {{sqrt(xMk)}} and {{{}sqrt(yMk){}}}. This helps avoid multiplication > overflow or cases where the product of extremely small values becomes zero. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45729) Fix PySpark testing guide links
Amanda Liu created SPARK-45729: -- Summary: Fix PySpark testing guide links Key: SPARK-45729 URL: https://issues.apache.org/jira/browse/SPARK-45729 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45637) Time window aggregation in separate streams followed by stream-stream join not returning results
[ https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-45637: Description: According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join: [https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995] However, I failed to reproduce this example and the query I built doesn't return any results: {code:java} from pyspark.sql.functions import rand from pyspark.sql.functions import expr, window, window_time spark.conf.set("spark.sql.shuffle.partitions", "1") impressions = ( spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .selectExpr("value AS adId", "timestamp AS impressionTime") ) impressionsWithWatermark = impressions \ .selectExpr("adId AS impressionAdId", "impressionTime") \ .withWatermark("impressionTime", "10 seconds") clicks = ( spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .where((rand() * 100).cast("integer") < 10) # 10 out of every 100 impressions result in a click .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime") # -10 so that a click with same id as impression is generated later (i.e. delayed data). .where("adId > 0") ) clicksWithWatermark = clicks \ .selectExpr("adId AS clickAdId", "clickTime") \ .withWatermark("clickTime", "10 seconds") clicksWindow = clicksWithWatermark.groupBy( window(clicksWithWatermark.clickTime, "1 minute") ).count() impressionsWindow = impressionsWithWatermark.groupBy( window(impressionsWithWatermark.impressionTime, "1 minute") ).count() clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner") clicksAndImpressions.writeStream \ .format("memory") \ .queryName("clicksAndImpressions") \ .outputMode("append") \ .start() {code} My intuition is that I'm getting no results because to output results of the first stateful operator (time window aggregation), a watermark needs to pass the end timestamp of the window. And once the watermark is after the end timestamp of the window, this window is ignored at the second stateful operator (stream-stream) join because it's behind the watermark. Indeed, a small hack done to event time column (adding one minute) between two stateful operators makes it possible to get results: {code:java} clicksWindow2 = clicksWithWatermark.groupBy( window(clicksWithWatermark.clickTime, "1 minute") ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window") impressionsWindow2 = impressionsWithWatermark.groupBy( window(impressionsWithWatermark.impressionTime, "1 minute") ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window") clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", "inner") clicksAndImpressions2.writeStream \ .format("memory") \ .queryName("clicksAndImpressions2") \ .outputMode("append") \ .start() {code} was: According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join: https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995 However, I failed to reproduce this example and the query I built doesn't return any results: {code:java} from pyspark.sql.functions import rand from pyspark.sql.functions import expr, window, window_time spark.conf.set("spark.sql.shuffle.partitions", "1") impressions = ( spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .selectExpr("value AS adId", "timestamp AS impressionTime") ) impressionsWithWatermark = impressions \ .selectExpr("adId AS impressionAdId", "impressionTime") \ .withWatermark("impressionTime", "10 seconds") clicks = ( spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .where((rand() * 100).cast("integer") < 10) # 10 out of every 100 impressions result in a click .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime") # -10 so that a click with same id as impression is generated later (i.e. delayed data). .where("adId > 0") ) clicksWithWatermark = clicks \ .selectExpr("adId AS clickAdId", "clickTime") \ .withWatermark("clickTime", "10 seconds") clicksWindow = clicksWithWatermark.groupBy( window(clicksWithWatermark.clickTime, "1 minute") ).count() impressionsWindow
[jira] [Created] (SPARK-45677) Observe API error logging
Wei Liu created SPARK-45677: --- Summary: Observe API error logging Key: SPARK-45677 URL: https://issues.apache.org/jira/browse/SPARK-45677 Project: Spark Issue Type: Task Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu We should tell user why it's not supported and what to do [https://github.com/apache/spark/blob/536439244593d40bdab88e9d3657f2691d3d33f2/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala#L76] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45053) Improve python version mismatch logging
[ https://issues.apache.org/jira/browse/SPARK-45053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-45053: Description: Currently the syntax of the python version mismatching is a little bit confusing, it uses (3,9) to represent python version 3.9. Just a minor update to make it more straightforward > Improve python version mismatch logging > --- > > Key: SPARK-45053 > URL: https://issues.apache.org/jira/browse/SPARK-45053 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.4.1 >Reporter: Wei Liu >Priority: Trivial > > Currently the syntax of the python version mismatching is a little bit > confusing, it uses (3,9) to represent python version 3.9. Just a minor update > to make it more straightforward -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45056) Add process termination tests for Python foreachBatch and StreamingQueryListener
Wei Liu created SPARK-45056: --- Summary: Add process termination tests for Python foreachBatch and StreamingQueryListener Key: SPARK-45056 URL: https://issues.apache.org/jira/browse/SPARK-45056 Project: Spark Issue Type: Task Components: Connect, Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45053) Improve python version mismatch logging
Wei Liu created SPARK-45053: --- Summary: Improve python version mismatch logging Key: SPARK-45053 URL: https://issues.apache.org/jira/browse/SPARK-45053 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44971) [BUG Fix] PySpark StreamingQuerProgress fromJson
[ https://issues.apache.org/jira/browse/SPARK-44971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-44971: Issue Type: Bug (was: New Feature) > [BUG Fix] PySpark StreamingQuerProgress fromJson > - > > Key: SPARK-44971 > URL: https://issues.apache.org/jira/browse/SPARK-44971 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.5.0, 3.5.1 >Reporter: Wei Liu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44971) [BUG Fix] PySpark StreamingQuerProgress fromJson
Wei Liu created SPARK-44971: --- Summary: [BUG Fix] PySpark StreamingQuerProgress fromJson Key: SPARK-44971 URL: https://issues.apache.org/jira/browse/SPARK-44971 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.5.0, 3.5.1 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44930) Deterministic ApplyFunctionExpression should be foldable
[ https://issues.apache.org/jira/browse/SPARK-44930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xianyang Liu updated SPARK-44930: - Description: Currently, ApplyFunctionExpression is unfoldable because inherits the default value from Expression. However, it should be foldable for a deterministic ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying to constant expressions. (was: Currently, ApplyFunctionExpression is unfoldable because inherits the default value from Expression. However, it should be foldable for a deterministic ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying on constant expression.) > Deterministic ApplyFunctionExpression should be foldable > > > Key: SPARK-44930 > URL: https://issues.apache.org/jira/browse/SPARK-44930 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Xianyang Liu >Priority: Major > > Currently, ApplyFunctionExpression is unfoldable because inherits the default > value from Expression. However, it should be foldable for a deterministic > ApplyFunctionExpression. This could help optimize the usage for V2 UDF > applying to constant expressions. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44930) Deterministic ApplyFunctionExpression should be foldable
[ https://issues.apache.org/jira/browse/SPARK-44930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xianyang Liu updated SPARK-44930: - Description: Currently, ApplyFunctionExpression is unfoldable because inherits the default value from Expression. However, it should be foldable for a deterministic ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying on constant expression. (was: Currently, ApplyFunctionExpression is unfoldable because inherits the default value from Expression. However, it should be foldable for a deterministic ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying on constant literal.) > Deterministic ApplyFunctionExpression should be foldable > > > Key: SPARK-44930 > URL: https://issues.apache.org/jira/browse/SPARK-44930 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Xianyang Liu >Priority: Major > > Currently, ApplyFunctionExpression is unfoldable because inherits the default > value from Expression. However, it should be foldable for a deterministic > ApplyFunctionExpression. This could help optimize the usage for V2 UDF > applying on constant expression. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44930) Deterministic ApplyFunctionExpression should be foldable
Xianyang Liu created SPARK-44930: Summary: Deterministic ApplyFunctionExpression should be foldable Key: SPARK-44930 URL: https://issues.apache.org/jira/browse/SPARK-44930 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.1 Reporter: Xianyang Liu Currently, ApplyFunctionExpression is unfoldable because inherits the default value from Expression. However, it should be foldable for a deterministic ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying on constant literal. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44917) PySpark Streaming DataStreamWriter table API
[ https://issues.apache.org/jira/browse/SPARK-44917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu resolved SPARK-44917. - Resolution: Not A Problem > PySpark Streaming DataStreamWriter table API > > > Key: SPARK-44917 > URL: https://issues.apache.org/jira/browse/SPARK-44917 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44917) PySpark Streaming DataStreamWriter table API
Wei Liu created SPARK-44917: --- Summary: PySpark Streaming DataStreamWriter table API Key: SPARK-44917 URL: https://issues.apache.org/jira/browse/SPARK-44917 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44913) DS V2 supports push down V2 UDF that has magic method
Xianyang Liu created SPARK-44913: Summary: DS V2 supports push down V2 UDF that has magic method Key: SPARK-44913 URL: https://issues.apache.org/jira/browse/SPARK-44913 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.1 Reporter: Xianyang Liu Right now we only support pushing down the V2 UDF that has not a magic method. Because the V2 UDF will be analyzed into the `ApplyFunctionExpression` which could be translated and pushed down. However, a V2 UDF that has the magic method will be analyzed into `StaticInvoke` or `Invoke` that can not be translated into V2 expression and then can not be pushed down to the data source. The magic method is suggested. So this PR adds the support of pushing down the V2 UDF that has a magic method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44460) Pass user auth credential to Python workers for foreachBatch and listener
[ https://issues.apache.org/jira/browse/SPARK-44460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757087#comment-17757087 ] Wei Liu commented on SPARK-44460: - [~rangadi] This seems to be a Databricks internal issue. See the updates in SC-138245 > Pass user auth credential to Python workers for foreachBatch and listener > - > > Key: SPARK-44460 > URL: https://issues.apache.org/jira/browse/SPARK-44460 > Project: Spark > Issue Type: Task > Components: Connect, Structured Streaming >Affects Versions: 3.4.1 >Reporter: Raghu Angadi >Priority: Major > > No user specific credentials are sent to Python worker that runs user > functions like foreachBatch() and streaming listener. > We might need to pass in these. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44839) Better error logging when user accesses spark session in foreachBatch and Listener
Wei Liu created SPARK-44839: --- Summary: Better error logging when user accesses spark session in foreachBatch and Listener Key: SPARK-44839 URL: https://issues.apache.org/jira/browse/SPARK-44839 Project: Spark Issue Type: New Feature Components: Connect, Structured Streaming Affects Versions: 3.5.0, 4.0.0 Reporter: Wei Liu TypeError: cannot pickle '_thread._local' object when user access `spark`. we need a better error for this -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754280#comment-17754280 ] Wei Liu commented on SPARK-44808: - This seems to be against the design principle of spark connect – We pass everything to the server side. Client only keep ids to get status. That's the reason why the foreachBatch and Listener are ran on the server side. This can be closed > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for connect python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So we could define a new interface on the streaming query manager, maybe > called refreshListener(listener), e.g. use it as > `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > For `foreachBatch`, we could wrap the function to a new class > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu resolved SPARK-44808. - Resolution: Won't Do > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for connect python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So we could define a new interface on the streaming query manager, maybe > called refreshListener(listener), e.g. use it as > `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > For `foreachBatch`, we could wrap the function to a new class > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-44808: Description: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So we could define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} For `foreachBatch`, we could wrap the function to a new class was: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} For `foreachBatch`, we could wrap the function to a new class > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for connect python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So we could define a new interface on the streaming query manager, maybe > called refreshListener(listener), e.g. use it as > `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > For `foreachBatch`, we could wrap the function to a new class > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-44808: Description: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} For `foreachBatch`, we could wrap the function to a new class was: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} For `foreachBatch`, we might could wrap the function to a new class, like the listener case > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for connect python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So if we define a new interface on the streaming query manager, maybe called > refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > For `foreachBatch`, we could wrap the function to a new class > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-44808: Description: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} For `foreachBatch`, we might could wrap the function to a new class, like the listener case was: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for connect python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So if we define a new interface on the streaming query manager, maybe called > refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > For `foreachBatch`, we might could wrap the function to a new class, like the > listener case > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-44808: Description: I’m thinking of an improvement for connect python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} was: I’m thinking of an improvement for python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for connect python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So if we define a new interface on the streaming query manager, maybe called > refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
Wei Liu created SPARK-44808: --- Summary: refreshListener() API on StreamingQueryManager for spark connect Key: SPARK-44808 URL: https://issues.apache.org/jira/browse/SPARK-44808 Project: Spark Issue Type: New Feature Components: Connect, Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu I’m thinking of an improvement for python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. ``` x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined ``` But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` ``` def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect
[ https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Liu updated SPARK-44808: Description: I’m thinking of an improvement for python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. {code:java} x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined {code} But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` {code:java} def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y {code} was: I’m thinking of an improvement for python listener and foreachBatch. Currently if you define a variable outside of the function, you can’t actually see it on client side if it’s touched in the function because the operation is on server side, e.g. ``` x = 0 class MyListener(StreamingQueryListener): def onQueryStarted(e): x = 100 self.y = 200 spark.streams.addListener(MyListener()) q = spark.readStream.start() # x is still 0, self.y is not defined ``` But for the self.y case, there could be an improvement. The server side is capable of pickle serialize the whole listener instance again, and send back to the client. So if we define a new interface on the streaming query manager, maybe called refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` ``` def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener # send the listener id with this refresh request, server receives this request and serializes the listener again, and send back to client, so the returned new listener contains the updated value self.y ``` > refreshListener() API on StreamingQueryManager for spark connect > > > Key: SPARK-44808 > URL: https://issues.apache.org/jira/browse/SPARK-44808 > Project: Spark > Issue Type: New Feature > Components: Connect, Structured Streaming >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > I’m thinking of an improvement for python listener and foreachBatch. > Currently if you define a variable outside of the function, you can’t > actually see it on client side if it’s touched in the function because the > operation is on server side, e.g. > > > {code:java} > x = 0 > class MyListener(StreamingQueryListener): > def onQueryStarted(e): > x = 100 > self.y = 200 > spark.streams.addListener(MyListener()) > q = spark.readStream.start() > > # x is still 0, self.y is not defined > {code} > > > But for the self.y case, there could be an improvement. > > The server side is capable of pickle serialize the whole listener instance > again, and send back to the client. > > So if we define a new interface on the streaming query manager, maybe called > refreshListener(listener), e.g. use it as `spark.streams.refreshListener()` > > > {code:java} > def refreshListener(listener: StreamingQueryListener) -> > StreamingQueryListener > # send the listener id with this refresh request, server receives this > request and serializes the listener again, and send back to client, so the > returned new listener contains the updated value self.y > > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44764) Streaming process improvement
Wei Liu created SPARK-44764: --- Summary: Streaming process improvement Key: SPARK-44764 URL: https://issues.apache.org/jira/browse/SPARK-44764 Project: Spark Issue Type: New Feature Components: Connect, Structured Streaming Affects Versions: 4.0.0 Reporter: Wei Liu # Deduplicate or remove StreamingPythonRunner if possible, it is very similar to existing PythonRunner # Use the DAEMON mode for starting a new process -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44712) Migrate test_timedelta_ops assert_eq to use assertDataFrameEqual
[ https://issues.apache.org/jira/browse/SPARK-44712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-44712: --- Description: Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py|https://github.com/databricks/runtime/blob/f579860299b16f6614f70c7cf2509cd89816d363/python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py#L176] (was: Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46]) > Migrate test_timedelta_ops assert_eq to use assertDataFrameEqual > - > > Key: SPARK-44712 > URL: https://issues.apache.org/jira/browse/SPARK-44712 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Amanda Liu >Priority: Major > > Migrate assert_eq to assertDataFrameEqual in this file: > [python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py|https://github.com/databricks/runtime/blob/f579860299b16f6614f70c7cf2509cd89816d363/python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py#L176] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44712) Migrate test_timedelta_ops assert_eq to use assertDataFrameEqual
Amanda Liu created SPARK-44712: -- Summary: Migrate test_timedelta_ops assert_eq to use assertDataFrameEqual Key: SPARK-44712 URL: https://issues.apache.org/jira/browse/SPARK-44712 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Amanda Liu Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44711) Migrate test_series_conversion assert_eq to use assertDataFrameEqual
[ https://issues.apache.org/jira/browse/SPARK-44711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-44711: --- Description: Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/test_series_conversion.py|https://github.com/databricks/runtime/blob/d162bd182de8bcea180d874027edb86ae8fc60e5/python/pyspark/pandas/tests/test_series_conversion.py#L63] was:Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46] > Migrate test_series_conversion assert_eq to use assertDataFrameEqual > > > Key: SPARK-44711 > URL: https://issues.apache.org/jira/browse/SPARK-44711 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Amanda Liu >Priority: Major > > Migrate assert_eq to assertDataFrameEqual in this file: > [python/pyspark/pandas/tests/test_series_conversion.py|https://github.com/databricks/runtime/blob/d162bd182de8bcea180d874027edb86ae8fc60e5/python/pyspark/pandas/tests/test_series_conversion.py#L63] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44711) Migrate test_series_conversion assert_eq to use assertDataFrameEqual
Amanda Liu created SPARK-44711: -- Summary: Migrate test_series_conversion assert_eq to use assertDataFrameEqual Key: SPARK-44711 URL: https://issues.apache.org/jira/browse/SPARK-44711 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Amanda Liu Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44708) Migrate test_reset_index assert_eq to use assertDataFrameEqual
Amanda Liu created SPARK-44708: -- Summary: Migrate test_reset_index assert_eq to use assertDataFrameEqual Key: SPARK-44708 URL: https://issues.apache.org/jira/browse/SPARK-44708 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Amanda Liu Migrate assert_eq to assertDataFrameEqual in this file: [python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org